face: send and receive NACK
refs #2930
Change-Id: I70c969ac12b493d2c83fa892beffae936cc23791
diff --git a/src/detail/face-impl.hpp b/src/detail/face-impl.hpp
index fb6642a..98fda53 100644
--- a/src/detail/face-impl.hpp
+++ b/src/detail/face-impl.hpp
@@ -39,6 +39,9 @@
#include "../management/nfd-controller.hpp"
#include "../management/nfd-command-options.hpp"
+#include "../management/nfd-local-control-header.hpp"
+
+#include "../lp/packet.hpp"
namespace ndn {
@@ -74,7 +77,7 @@
satisfyPendingInterests(Data& data)
{
for (auto entry = m_pendingInterestTable.begin(); entry != m_pendingInterestTable.end(); ) {
- if ((*entry)->getInterest().matchesData(data)) {
+ if ((*entry)->getInterest()->matchesData(data)) {
shared_ptr<PendingInterest> matchedEntry = *entry;
entry = m_pendingInterestTable.erase(entry);
@@ -87,6 +90,24 @@
}
void
+ nackPendingInterests(const lp::Nack& nack)
+ {
+ for (auto entry = m_pendingInterestTable.begin(); entry != m_pendingInterestTable.end(); ) {
+ const Interest& pendingInterest = *(*entry)->getInterest();
+ if (pendingInterest == nack.getInterest()) {
+ shared_ptr<PendingInterest> matchedEntry = *entry;
+
+ entry = m_pendingInterestTable.erase(entry);
+
+ matchedEntry->invokeNackCallback(nack);
+ }
+ else {
+ ++entry;
+ }
+ }
+ }
+
+ void
processInterestFilters(Interest& interest)
{
for (const auto& filter : m_interestFilterTable) {
@@ -111,26 +132,32 @@
}
void
- asyncExpressInterest(const shared_ptr<const Interest>& interest,
- const OnData& onData, const OnTimeout& onTimeout)
+ asyncExpressInterest(shared_ptr<const Interest> interest,
+ const DataCallback& afterSatisfied,
+ const NackCallback& afterNacked,
+ const TimeoutCallback& afterTimeout)
{
this->ensureConnected(true);
auto entry =
m_pendingInterestTable.insert(make_shared<PendingInterest>(interest,
- onData, onTimeout,
+ afterSatisfied,
+ afterNacked,
+ afterTimeout,
ref(m_scheduler))).first;
(*entry)->setDeleter([this, entry] { m_pendingInterestTable.erase(entry); });
- if (!interest->getLocalControlHeader().empty(nfd::LocalControlHeader::ENCODE_NEXT_HOP)) {
- // encode only NextHopFaceId towards the forwarder
- m_face.m_transport->send(interest->getLocalControlHeader()
- .wireEncode(*interest, nfd::LocalControlHeader::ENCODE_NEXT_HOP),
- interest->wireEncode());
+ lp::Packet packet;
+
+ nfd::LocalControlHeader localControlHeader = interest->getLocalControlHeader();
+ if (localControlHeader.hasNextHopFaceId()) {
+ packet.add<lp::NextHopFaceIdField>(localControlHeader.getNextHopFaceId());
}
- else {
- m_face.m_transport->send(interest->wireEncode());
- }
+
+ packet.add<lp::FragmentField>(std::make_pair(interest->wireEncode().begin(),
+ interest->wireEncode().end()));
+
+ m_face.m_transport->send(packet.wireEncode());
}
void
@@ -144,15 +171,40 @@
{
this->ensureConnected(true);
- if (!data->getLocalControlHeader().empty(nfd::LocalControlHeader::ENCODE_CACHING_POLICY)) {
- m_face.m_transport->send(
- data->getLocalControlHeader().wireEncode(*data,
- nfd::LocalControlHeader::ENCODE_CACHING_POLICY),
- data->wireEncode());
+ lp::Packet packet;
+
+ nfd::LocalControlHeader localControlHeader = data->getLocalControlHeader();
+ if (localControlHeader.hasCachingPolicy()) {
+ switch (localControlHeader.getCachingPolicy()) {
+ case nfd::LocalControlHeader::CachingPolicy::NO_CACHE: {
+ lp::CachePolicy cachePolicy;
+ cachePolicy.setPolicy(lp::CachePolicyType::NO_CACHE);
+ packet.add<lp::CachePolicyField>(cachePolicy);
+ break;
+ }
+ default:
+ break;
+ }
}
- else {
- m_face.m_transport->send(data->wireEncode());
- }
+
+ packet.add<lp::FragmentField>(std::make_pair(data->wireEncode().begin(),
+ data->wireEncode().end()));
+
+ m_face.m_transport->send(packet.wireEncode());
+ }
+
+ void
+ asyncPutNack(shared_ptr<const lp::Nack> nack)
+ {
+ this->ensureConnected(true);
+
+ lp::Packet packet;
+ packet.add<lp::NackField>(nack->getHeader());
+
+ Block interest = nack->getInterest().wireEncode();
+ packet.add<lp::FragmentField>(std::make_pair(interest.begin(), interest.end()));
+
+ m_face.m_transport->send(packet.wireEncode());
}
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/src/detail/pending-interest.hpp b/src/detail/pending-interest.hpp
index 41f6b37..bac5836 100644
--- a/src/detail/pending-interest.hpp
+++ b/src/detail/pending-interest.hpp
@@ -28,31 +28,32 @@
#include "../util/time.hpp"
#include "../util/scheduler.hpp"
#include "../util/scheduler-scoped-event-id.hpp"
+#include "../lp/nack.hpp"
namespace ndn {
class PendingInterest : noncopyable
{
public:
- typedef function<void(const Interest&, Data&)> OnData;
- typedef function<void(const Interest&)> OnTimeout;
-
/**
* @brief Create a new PitEntry and set the timeout based on the current time and
- * the interest lifetime.
- *
- * @param interest A shared_ptr for the interest
- * @param onData A function object to call when a matching data packet is received.
- * @param onTimeout A function object to call if the interest times out.
- * If onTimeout is an empty OnTimeout(), this does not use it.
- * @param scheduler Scheduler instance to use to schedule a timeout event. The scheduled
- * event will be automatically cancelled when pending interest is destroyed.
+ * the Interest lifetime.
+ * @param interest shared_ptr for the Interest
+ * @param dataCallback function to call when matching Data packet is received
+ * @param nackCallback function to call when Nack matching Interest is received
+ * @param timeoutCallback function to call if Interest times out
+ * @param scheduler Scheduler instance to use to schedule a timeout event. The scheduled
+ * event will be automatically cancelled when pending Interest is destroyed.
*/
- PendingInterest(shared_ptr<const Interest> interest, const OnData& onData,
- const OnTimeout& onTimeout, Scheduler& scheduler)
+ PendingInterest(shared_ptr<const Interest> interest,
+ const DataCallback& dataCallback,
+ const NackCallback& nackCallback,
+ const TimeoutCallback& timeoutCallback,
+ Scheduler& scheduler)
: m_interest(interest)
- , m_onData(onData)
- , m_onTimeout(onTimeout)
+ , m_dataCallback(dataCallback)
+ , m_nackCallback(nackCallback)
+ , m_timeoutCallback(timeoutCallback)
, m_timeoutEvent(scheduler)
{
m_timeoutEvent =
@@ -65,10 +66,10 @@
/**
* @return the Interest
*/
- const Interest&
+ shared_ptr<const Interest>
getInterest() const
{
- return *m_interest;
+ return m_interest;
}
/**
@@ -76,9 +77,19 @@
* @note If the DataCallback is an empty function, this method does nothing.
*/
void
- invokeDataCallback(Data& data)
+ invokeDataCallback(const Data& data)
{
- m_onData(*m_interest, data);
+ m_dataCallback(*m_interest, data);
+ }
+
+ /**
+ * @brief invokes the NackCallback
+ * @note If the NackCallback is an empty function, this method does nothing.
+ */
+ void
+ invokeNackCallback(const lp::Nack& nack)
+ {
+ m_nackCallback(*m_interest, nack);
}
/**
@@ -98,8 +109,8 @@
void
invokeTimeoutCallback()
{
- if (m_onTimeout) {
- m_onTimeout(*m_interest);
+ if (m_timeoutCallback) {
+ m_timeoutCallback(*m_interest);
}
BOOST_ASSERT(m_deleter);
@@ -108,8 +119,9 @@
private:
shared_ptr<const Interest> m_interest;
- const OnData m_onData;
- const OnTimeout m_onTimeout;
+ DataCallback m_dataCallback;
+ NackCallback m_nackCallback;
+ TimeoutCallback m_timeoutCallback;
util::scheduler::ScopedEventId m_timeoutEvent;
std::function<void()> m_deleter;
};
@@ -133,7 +145,7 @@
operator()(const shared_ptr<const PendingInterest>& pendingInterest) const
{
return (reinterpret_cast<const PendingInterestId*>(
- &pendingInterest->getInterest()) == m_id);
+ pendingInterest->getInterest().get()) == m_id);
}
private:
const PendingInterestId* m_id;
diff --git a/src/face.cpp b/src/face.cpp
index 1579ee5..336e85e 100644
--- a/src/face.cpp
+++ b/src/face.cpp
@@ -134,24 +134,50 @@
Face::~Face() = default;
const PendingInterestId*
-Face::expressInterest(const Interest& interest, const OnData& onData, const OnTimeout& onTimeout)
+Face::expressInterest(const Interest& interest,
+ const DataCallback& afterSatisfied,
+ const NackCallback& afterNacked,
+ const TimeoutCallback& afterTimeout)
{
shared_ptr<Interest> interestToExpress = make_shared<Interest>(interest);
// Use `interestToExpress` to avoid wire format creation for the original Interest
- if (interestToExpress->wireEncode().size() > MAX_NDN_PACKET_SIZE)
+ if (interestToExpress->wireEncode().size() > MAX_NDN_PACKET_SIZE) {
BOOST_THROW_EXCEPTION(Error("Interest size exceeds maximum limit"));
+ }
// If the same ioService thread, dispatch directly calls the method
- m_ioService.dispatch([=] { m_impl->asyncExpressInterest(interestToExpress, onData, onTimeout); });
+ m_ioService.dispatch([=] { m_impl->asyncExpressInterest(interestToExpress, afterSatisfied,
+ afterNacked, afterTimeout); });
return reinterpret_cast<const PendingInterestId*>(interestToExpress.get());
}
const PendingInterestId*
+Face::expressInterest(const Interest& interest,
+ const OnData& onData,
+ const OnTimeout& onTimeout)
+{
+ return this->expressInterest(
+ interest,
+ [onData] (const Interest& interest, const Data& data) {
+ if (onData != nullptr) {
+ onData(interest, const_cast<Data&>(data));
+ }
+ },
+ [onTimeout] (const Interest& interest, const lp::Nack& nack) {
+ if (onTimeout != nullptr) {
+ onTimeout(interest);
+ }
+ },
+ onTimeout
+ );
+}
+
+const PendingInterestId*
Face::expressInterest(const Name& name,
const Interest& tmpl,
- const OnData& onData, const OnTimeout& onTimeout/* = OnTimeout()*/)
+ const OnData& onData, const OnTimeout& onTimeout/* = nullptr*/)
{
return expressInterest(Interest(tmpl)
.setName(name)
@@ -181,6 +207,12 @@
}
void
+Face::put(const lp::Nack& nack)
+{
+ m_ioService.dispatch([=] { m_impl->asyncPutNack(make_shared<lp::Nack>(nack)); });
+}
+
+void
Face::removePendingInterest(const PendingInterestId* pendingInterestId)
{
m_ioService.post([=] { m_impl->asyncRemovePendingInterest(pendingInterestId); });
@@ -427,28 +459,50 @@
m_impl->m_ioServiceWork.reset();
}
+/**
+ * @brief extract local fields from NDNLPv2 packet and tag onto a network layer packet
+ */
+template<typename NETPKT>
+static void
+extractLpLocalFields(NETPKT& netPacket, const lp::Packet& lpPacket)
+{
+ if (lpPacket.has<lp::IncomingFaceIdField>()) {
+ netPacket.getLocalControlHeader().
+ setIncomingFaceId(lpPacket.get<lp::IncomingFaceIdField>());
+ }
+}
+
void
Face::onReceiveElement(const Block& blockFromDaemon)
{
- const Block& block = nfd::LocalControlHeader::getPayload(blockFromDaemon);
+ lp::Packet lpPacket(blockFromDaemon); // bare Interest/Data is a valid lp::Packet,
+ // no need to distinguish
- if (block.type() == tlv::Interest)
- {
- shared_ptr<Interest> interest = make_shared<Interest>(block);
- if (&block != &blockFromDaemon)
- interest->getLocalControlHeader().wireDecode(blockFromDaemon);
-
- m_impl->processInterestFilters(*interest);
+ Buffer::const_iterator begin, end;
+ std::tie(begin, end) = lpPacket.get<lp::FragmentField>();
+ Block netPacket(&*begin, std::distance(begin, end));
+ switch (netPacket.type()) {
+ case tlv::Interest: {
+ shared_ptr<Interest> interest = make_shared<Interest>(netPacket);
+ if (lpPacket.has<lp::NackField>()) {
+ auto nack = make_shared<lp::Nack>(std::move(*interest));
+ nack->setHeader(lpPacket.get<lp::NackField>());
+ extractLpLocalFields(*nack, lpPacket);
+ m_impl->nackPendingInterests(*nack);
+ }
+ else {
+ extractLpLocalFields(*interest, lpPacket);
+ m_impl->processInterestFilters(*interest);
+ }
+ break;
}
- else if (block.type() == tlv::Data)
- {
- shared_ptr<Data> data = make_shared<Data>(block);
- if (&block != &blockFromDaemon)
- data->getLocalControlHeader().wireDecode(blockFromDaemon);
-
+ case tlv::Data: {
+ shared_ptr<Data> data = make_shared<Data>(netPacket);
+ extractLpLocalFields(*data, lpPacket);
m_impl->satisfyPendingInterests(*data);
+ break;
}
- // ignore any other type
+ }
}
} // namespace ndn
diff --git a/src/face.hpp b/src/face.hpp
index 80e7930..a07e293 100644
--- a/src/face.hpp
+++ b/src/face.hpp
@@ -29,6 +29,7 @@
#include "interest-filter.hpp"
#include "data.hpp"
#include "security/signing-info.hpp"
+#include "lp/nack.hpp"
#define NDN_FACE_KEEP_DEPRECATED_REGISTRATION_SIGNING
@@ -60,12 +61,29 @@
}
/**
+ * @brief Callback called when expressed Interest gets satisfied with a Data packet
+ */
+typedef function<void(const Interest&, const Data&)> DataCallback;
+
+/**
+ * @brief Callback called when Nack is sent in response to expressed Interest
+ */
+typedef function<void(const Interest&, const lp::Nack&)> NackCallback;
+
+/**
+ * @brief Callback called when expressed Interest times out
+ */
+typedef function<void(const Interest&)> TimeoutCallback;
+
+/**
* @brief Callback called when expressed Interest gets satisfied with Data packet
+ * @deprecated use DataCallback
*/
typedef function<void(const Interest&, Data&)> OnData;
/**
* @brief Callback called when expressed Interest times out
+ * @deprecated use TimeoutCallback
*/
typedef function<void(const Interest&)> OnTimeout;
@@ -94,7 +112,6 @@
*/
typedef function<void(const std::string&)> UnregisterPrefixFailureCallback;
-
/**
* @brief Abstraction to communicate with local or remote NDN forwarder
*/
@@ -196,18 +213,36 @@
public: // consumer
/**
* @brief Express Interest
+ * @param interest the Interest; a copy will be made, so that the caller is not
+ * required to maintain the argument unchanged
+ * @param afterSatisfied function to be invoked if Data is returned
+ * @param afterNacked function to be invoked if Network NACK is returned
+ * @param afterTimeout function to be invoked if neither Data nor Network NACK
+ * is returned within InterestLifetime
+ */
+ const PendingInterestId*
+ expressInterest(const Interest& interest,
+ const DataCallback& afterSatisfied,
+ const NackCallback& afterNacked,
+ const TimeoutCallback& afterTimeout);
+
+ /**
+ * @brief Express Interest
*
* @param interest An Interest to be expressed
* @param onData Callback to be called when a matching data packet is received
- * @param onTimeout (optional) A function object to call if the interest times out
+ * @param onTimeout (optional) A function object to call if the interest times out or is Nacked
*
* @return The pending interest ID which can be used with removePendingInterest
*
* @throws Error when Interest size exceeds maximum limit (MAX_NDN_PACKET_SIZE)
+ *
+ * @deprecated use expressInterest(Interest, DataCallback, NackCallback, TimeoutCallback)
*/
const PendingInterestId*
expressInterest(const Interest& interest,
- const OnData& onData, const OnTimeout& onTimeout = OnTimeout());
+ const OnData& onData,
+ const OnTimeout& onTimeout = nullptr);
/**
* @brief Express Interest using name and Interest template
@@ -215,16 +250,19 @@
* @param name Name of the Interest
* @param tmpl Interest template to fill parameters
* @param onData Callback to be called when a matching data packet is received
- * @param onTimeout (optional) A function object to call if the interest times out
+ * @param onTimeout (optional) A function object to call if the interest times out or is Nacked
*
* @return Opaque pending interest ID which can be used with removePendingInterest
*
* @throws Error when Interest size exceeds maximum limit (MAX_NDN_PACKET_SIZE)
+ *
+ * @deprecated use expressInterest(Interest, DataCallback, NackCallback, TimeoutCallback)
*/
const PendingInterestId*
expressInterest(const Name& name,
const Interest& tmpl,
- const OnData& onData, const OnTimeout& onTimeout = OnTimeout());
+ const OnData& onData,
+ const OnTimeout& onTimeout = nullptr);
/**
* @brief Cancel previously expressed Interest
@@ -573,6 +611,14 @@
void
put(const Data& data);
+ /**
+ * @brief sends a Network NACK
+ * @param nack the Nack; a copy will be made, so that the caller is not required to
+ * maintain the argument unchanged
+ */
+ void
+ put(const lp::Nack& nack);
+
public: // IO routine
/**
* @brief Process any data to receive or call timeout callbacks.
@@ -635,7 +681,7 @@
construct(shared_ptr<Transport> transport, KeyChain& keyChain);
void
- onReceiveElement(const Block& wire);
+ onReceiveElement(const Block& blockFromDaemon);
void
asyncShutdown();
diff --git a/src/lp/detail/field-decl.hpp b/src/lp/detail/field-decl.hpp
index 6804b8a..49e3c07 100644
--- a/src/lp/detail/field-decl.hpp
+++ b/src/lp/detail/field-decl.hpp
@@ -43,10 +43,6 @@
BOOST_CONCEPT_REQUIRES(((WireDecodable<T>)), (T))
decode(const Block& wire)
{
- if (wire.type() != TlvType::value) {
- BOOST_THROW_EXCEPTION(ndn::tlv::Error("Unexpected TLV type " + to_string(wire.type())));
- }
-
T type;
type.wireDecode(wire);
return type;
@@ -59,10 +55,6 @@
static uint64_t
decode(const Block& wire)
{
- if (wire.type() != TlvType::value) {
- BOOST_THROW_EXCEPTION(ndn::tlv::Error("Unexpected TLV type " + to_string(wire.type())));
- }
-
return readNonNegativeInteger(wire);
}
};
@@ -73,10 +65,6 @@
static std::pair<Buffer::const_iterator, Buffer::const_iterator>
decode(const Block& wire)
{
- if (wire.type() != TlvType::value) {
- BOOST_THROW_EXCEPTION(ndn::tlv::Error("Unexpected TLV type " + to_string(wire.type())));
- }
-
if (wire.value_size() == 0) {
BOOST_THROW_EXCEPTION(ndn::tlv::Error(to_string(wire.type()) + " must not be empty"));
}
@@ -129,12 +117,23 @@
typedef std::integral_constant<uint64_t, TYPE> TlvType;
typedef std::integral_constant<bool, REPEATABLE> IsRepeatable;
+ /** \brief decodes a field
+ * \param wire a Block with top-level type \p TYPE
+ * \return value of the field
+ */
static ValueType
decode(const Block& wire)
{
+ if (wire.type() != TlvType::value) {
+ BOOST_THROW_EXCEPTION(ndn::tlv::Error("Unexpected TLV type " + std::to_string(wire.type())));
+ }
+
return DecodeHelper<TlvType, ValueType>::decode(wire);
}
+ /** \brief encodes a field and prepends to \p encoder its Block with top-level type \p TYPE
+ * \param value value of the field
+ */
template<typename encoding::Tag TAG, typename T>
static size_t
encode(EncodingImpl<TAG>& encoder, const T& value)
diff --git a/src/lp/nack.hpp b/src/lp/nack.hpp
index 5fb57da..a50221c 100644
--- a/src/lp/nack.hpp
+++ b/src/lp/nack.hpp
@@ -27,6 +27,7 @@
#include "../common.hpp"
#include "../tag-host.hpp"
#include "../interest.hpp"
+#include "../management/nfd-local-control-header.hpp"
#include "nack-header.hpp"
@@ -87,6 +88,18 @@
return *this;
}
+ nfd::LocalControlHeader&
+ getLocalControlHeader()
+ {
+ return m_interest.getLocalControlHeader();
+ }
+
+ const nfd::LocalControlHeader&
+ getLocalControlHeader() const
+ {
+ return m_interest.getLocalControlHeader();
+ }
+
public: // NackHeader proxy
NackReason
getReason() const
diff --git a/src/lp/packet.cpp b/src/lp/packet.cpp
index 67957af..82b7821 100644
--- a/src/lp/packet.cpp
+++ b/src/lp/packet.cpp
@@ -63,13 +63,21 @@
template size_t
Packet::wireEncode<encoding::EstimatorTag>(EncodingImpl<encoding::EstimatorTag>& encoder) const;
-const Block&
+const Block
Packet::wireEncode() const
{
if (m_wire.hasWire()) {
return m_wire;
}
+ // If no header or trailer, return bare network packet
+ Block::element_container elements = m_wire.elements();
+ if (elements.size() == 1 && elements.front().type() == FragmentField::TlvType::value) {
+ elements.front().parse();
+ elements.front().elements().front().parse();
+ return elements.front().elements().front();
+ }
+
EncodingEstimator estimator;
size_t estimatedSize = wireEncode(estimator);
diff --git a/src/lp/packet.hpp b/src/lp/packet.hpp
index 53b998d..eed6b51 100644
--- a/src/lp/packet.hpp
+++ b/src/lp/packet.hpp
@@ -55,7 +55,7 @@
/**
* \brief encode packet into wire format
*/
- const Block&
+ const Block
wireEncode() const;
/**
diff --git a/src/util/dummy-client-face.cpp b/src/util/dummy-client-face.cpp
index cfa28a5..1bfce8f 100644
--- a/src/util/dummy-client-face.cpp
+++ b/src/util/dummy-client-face.cpp
@@ -33,10 +33,12 @@
{
public:
void
- receive(const Block& block)
+ receive(Block block)
{
- if (static_cast<bool>(m_receiveCallback))
+ block.encode();
+ if (static_cast<bool>(m_receiveCallback)) {
m_receiveCallback(block);
+ }
}
virtual void
@@ -99,21 +101,47 @@
DummyClientFace::construct(const Options& options)
{
m_transport->onSendBlock.connect([this] (const Block& blockFromDaemon) {
- const Block& block = nfd::LocalControlHeader::getPayload(blockFromDaemon);
+ try {
+ Block packet(blockFromDaemon);
+ packet.encode();
+ lp::Packet lpPacket(packet);
- if (block.type() == tlv::Interest) {
- shared_ptr<Interest> interest = make_shared<Interest>(block);
- if (&block != &blockFromDaemon)
- interest->getLocalControlHeader().wireDecode(blockFromDaemon);
+ Buffer::const_iterator begin, end;
+ std::tie(begin, end) = lpPacket.get<lp::FragmentField>();
+ Block block(&*begin, std::distance(begin, end));
- onSendInterest(*interest);
+ if (block.type() == tlv::Interest) {
+ shared_ptr<Interest> interest = make_shared<Interest>(block);
+ if (lpPacket.has<lp::NackField>()) {
+ shared_ptr<lp::Nack> nack = make_shared<lp::Nack>(std::move(*interest));
+ nack->setHeader(lpPacket.get<lp::NackField>());
+ if (lpPacket.has<lp::NextHopFaceIdField>()) {
+ nack->getLocalControlHeader().setNextHopFaceId(lpPacket.get<lp::NextHopFaceIdField>());
+ }
+ onSendNack(*nack);
+ }
+ else {
+ if (lpPacket.has<lp::NextHopFaceIdField>()) {
+ interest->getLocalControlHeader().
+ setNextHopFaceId(lpPacket.get<lp::NextHopFaceIdField>());
+ }
+ onSendInterest(*interest);
+ }
+ }
+ else if (block.type() == tlv::Data) {
+ shared_ptr<Data> data = make_shared<Data>(block);
+
+ if (lpPacket.has<lp::CachePolicyField>()) {
+ if (lpPacket.get<lp::CachePolicyField>().getPolicy() == lp::CachePolicyType::NO_CACHE) {
+ data->getLocalControlHeader().setCachingPolicy(nfd::LocalControlHeader::CachingPolicy::NO_CACHE);
+ }
+ }
+
+ onSendData(*data);
+ }
}
- else if (block.type() == tlv::Data) {
- shared_ptr<Data> data = make_shared<Data>(block);
- if (&block != &blockFromDaemon)
- data->getLocalControlHeader().wireDecode(blockFromDaemon);
-
- onSendData(*data);
+ catch (tlv::Error& e) {
+ throw tlv::Error("Error decoding NDNLPv2 packet");
}
});
@@ -133,6 +161,9 @@
onSendData.connect([this] (const Data& data) {
this->sentDatas.push_back(data);
});
+ onSendNack.connect([this] (const lp::Nack& nack) {
+ this->sentNacks.push_back(nack);
+ });
}
void
@@ -168,22 +199,19 @@
void
DummyClientFace::receive(const Packet& packet)
{
- // do not restrict what injected control header can contain
- if (!packet.getLocalControlHeader().empty(nfd::LocalControlHeader::ENCODE_ALL)) {
+ lp::Packet lpPacket(packet.wireEncode());
- Block header = packet.getLocalControlHeader().wireEncode(packet,
- nfd::LocalControlHeader::ENCODE_ALL);
- Block payload = packet.wireEncode();
+ nfd::LocalControlHeader localControlHeader = packet.getLocalControlHeader();
- EncodingBuffer encoder(header.size() + payload.size(), header.size() + payload.size());
- encoder.appendByteArray(header.wire(), header.size());
- encoder.appendByteArray(payload.wire(), payload.size());
-
- m_transport->receive(encoder.block());
+ if (localControlHeader.hasIncomingFaceId()) {
+ lpPacket.add<lp::IncomingFaceIdField>(localControlHeader.getIncomingFaceId());
}
- else {
- m_transport->receive(packet.wireEncode());
+
+ if (localControlHeader.hasNextHopFaceId()) {
+ lpPacket.add<lp::NextHopFaceIdField>(localControlHeader.getNextHopFaceId());
}
+
+ m_transport->receive(lpPacket.wireEncode());
}
template void
@@ -192,6 +220,23 @@
template void
DummyClientFace::receive<Data>(const Data& packet);
+template<>
+void
+DummyClientFace::receive<lp::Nack>(const lp::Nack& nack)
+{
+ lp::Packet lpPacket;
+ lpPacket.add<lp::NackField>(nack.getHeader());
+ Block interest = nack.getInterest().wireEncode();
+ lpPacket.add<lp::FragmentField>(make_pair(interest.begin(), interest.end()));
+
+ nfd::LocalControlHeader localControlHeader = nack.getLocalControlHeader();
+
+ if (localControlHeader.hasIncomingFaceId()) {
+ lpPacket.add<lp::IncomingFaceIdField>(localControlHeader.getIncomingFaceId());
+ }
+
+ m_transport->receive(lpPacket.wireEncode());
+}
shared_ptr<DummyClientFace>
makeDummyClientFace(const DummyClientFace::Options& options)
diff --git a/src/util/dummy-client-face.hpp b/src/util/dummy-client-face.hpp
index 2813779..9f46ca6 100644
--- a/src/util/dummy-client-face.hpp
+++ b/src/util/dummy-client-face.hpp
@@ -24,6 +24,7 @@
#include "../face.hpp"
#include "signal.hpp"
+#include "../lp/packet.hpp"
namespace ndn {
namespace util {
@@ -104,6 +105,14 @@
*/
std::vector<Data> sentDatas;
+ /** \brief NACKs sent out of this DummyClientFace
+ *
+ * Sent NACKs are appended to this container if options.enablePacketLogger is true.
+ * User of this class is responsible for cleaning up the container, if necessary.
+ * After .put, .processEvents must be called before the NACK would show up here.
+ */
+ std::vector<lp::Nack> sentNacks;
+
/** \brief emits whenever an Interest is sent
*
* After .expressInterest, .processEvents must be called before this signal would be emitted.
@@ -116,6 +125,12 @@
*/
Signal<DummyClientFace, Data> onSendData;
+ /** \brief emits whenever a NACK is sent
+ *
+ * After .put, .processEvents must be called before this signal would be emitted.
+ */
+ Signal<DummyClientFace, lp::Nack> onSendNack;
+
private:
shared_ptr<Transport> m_transport;
};
diff --git a/tests/unit-tests/face.t.cpp b/tests/unit-tests/face.t.cpp
index 94d0935..4127655 100644
--- a/tests/unit-tests/face.t.cpp
+++ b/tests/unit-tests/face.t.cpp
@@ -64,6 +64,45 @@
face->expressInterest(Interest("/Hello/World", time::milliseconds(50)),
[&] (const Interest& i, const Data& d) {
BOOST_CHECK(i.getName().isPrefixOf(d.getName()));
+ BOOST_CHECK_EQUAL(i.getName(), "/Hello/World");
+ ++nData;
+ },
+ bind([] {
+ BOOST_FAIL("Unexpected Nack");
+ }),
+ bind([] {
+ BOOST_FAIL("Unexpected timeout");
+ }));
+
+ advanceClocks(time::milliseconds(1), 40);
+
+ face->receive(*util::makeData("/Bye/World/a"));
+ face->receive(*util::makeData("/Hello/World/a"));
+
+ advanceClocks(time::milliseconds(1), 100);
+
+ BOOST_CHECK_EQUAL(nData, 1);
+ BOOST_CHECK_EQUAL(face->sentInterests.size(), 1);
+ BOOST_CHECK_EQUAL(face->sentDatas.size(), 0);
+
+ size_t nTimeouts = 0;
+ face->expressInterest(Interest("/Hello/World/a/2", time::milliseconds(50)),
+ bind([]{}),
+ bind([]{}),
+ bind([&nTimeouts] {
+ ++nTimeouts;
+ }));
+ advanceClocks(time::milliseconds(10), 100);
+ BOOST_CHECK_EQUAL(nTimeouts, 1);
+}
+
+// test case for deprecated expressInterest implementation
+BOOST_AUTO_TEST_CASE(DeprecatedExpressInterestData)
+{
+ size_t nData = 0;
+ face->expressInterest(Interest("/Hello/World", time::milliseconds(50)),
+ [&] (const Interest& i, const Data& d) {
+ BOOST_CHECK(i.getName().isPrefixOf(d.getName()));
++nData;
},
bind([] {
@@ -72,8 +111,8 @@
advanceClocks(time::milliseconds(1), 40);
- face->receive(*util::makeData("/Bye/World/!"));
- face->receive(*util::makeData("/Hello/World/!"));
+ face->receive(*util::makeData("/Bye/World/a"));
+ face->receive(*util::makeData("/Hello/World/a"));
advanceClocks(time::milliseconds(1), 100);
@@ -81,7 +120,7 @@
BOOST_CHECK_EQUAL(face->sentInterests.size(), 1);
BOOST_CHECK_EQUAL(face->sentDatas.size(), 0);
- face->expressInterest(Interest("/Hello/World/!", time::milliseconds(50)),
+ face->expressInterest(Interest("/Hello/World/a", time::milliseconds(50)),
[&] (const Interest& i, const Data& d) {
BOOST_CHECK(i.getName().isPrefixOf(d.getName()));
++nData;
@@ -90,7 +129,7 @@
BOOST_FAIL("Unexpected timeout");
}));
advanceClocks(time::milliseconds(1), 40);
- face->receive(*util::makeData("/Hello/World/!/1/xxxxx"));
+ face->receive(*util::makeData("/Hello/World/a/1/xxxxx"));
advanceClocks(time::milliseconds(1), 100);
@@ -99,7 +138,7 @@
BOOST_CHECK_EQUAL(face->sentDatas.size(), 0);
size_t nTimeouts = 0;
- face->expressInterest(Interest("/Hello/World/!/2", time::milliseconds(50)),
+ face->expressInterest(Interest("/Hello/World/a/2", time::milliseconds(50)),
bind([]{}),
bind([&nTimeouts] {
++nTimeouts;
@@ -113,6 +152,30 @@
size_t nTimeouts = 0;
face->expressInterest(Interest("/Hello/World", time::milliseconds(50)),
bind([] {
+ BOOST_FAIL("Unexpected ata");
+ }),
+ bind([] {
+ BOOST_FAIL("Unexpected Nack");
+ }),
+ [&nTimeouts] (const Interest& i) {
+ BOOST_CHECK_EQUAL(i.getName(), "/Hello/World");
+ ++nTimeouts;
+ });
+
+ advanceClocks(time::milliseconds(10), 100);
+
+ BOOST_CHECK_EQUAL(nTimeouts, 1);
+ BOOST_CHECK_EQUAL(face->sentInterests.size(), 1);
+ BOOST_CHECK_EQUAL(face->sentDatas.size(), 0);
+ BOOST_CHECK_EQUAL(face->sentNacks.size(), 0);
+}
+
+// test case for deprecated expressInterest implementation
+BOOST_AUTO_TEST_CASE(DeprecatedExpressInterestTimeout)
+{
+ size_t nTimeouts = 0;
+ face->expressInterest(Interest("/Hello/World", time::milliseconds(50)),
+ bind([] {
BOOST_FAIL("Unexpected data");
}),
bind([&nTimeouts] {
@@ -126,6 +189,41 @@
BOOST_CHECK_EQUAL(face->sentDatas.size(), 0);
}
+BOOST_AUTO_TEST_CASE(ExpressInterestNack)
+{
+ size_t nNacks = 0;
+
+ Interest interest("/Hello/World", time::milliseconds(50));
+
+ face->expressInterest(interest,
+ bind([] {
+ BOOST_FAIL("Unexpected Data");
+ }),
+ [&] (const Interest& i, const lp::Nack& n) {
+ BOOST_CHECK(i.getName().isPrefixOf(n.getInterest().getName()));
+ BOOST_CHECK_EQUAL(i.getName(), "/Hello/World");
+ BOOST_CHECK_EQUAL(n.getReason(), lp::NackReason::DUPLICATE);
+ ++nNacks;
+ },
+ bind([] {
+ BOOST_FAIL("Unexpected timeout");
+ }));
+
+ advanceClocks(time::milliseconds(1), 40);
+
+ lp::Nack nack(face->sentInterests[0]);
+ nack.setReason(lp::NackReason::DUPLICATE);
+
+ BOOST_CHECK_EQUAL(face->sentNacks.size(), 0);
+
+ face->receive(nack);
+
+ advanceClocks(time::milliseconds(1), 100);
+
+ BOOST_CHECK_EQUAL(nNacks, 1);
+ BOOST_CHECK_EQUAL(face->sentInterests.size(), 1);
+}
+
BOOST_AUTO_TEST_CASE(RemovePendingInterest)
{
const PendingInterestId* interestId =
@@ -466,9 +564,7 @@
{
face->setInterestFilter("/Hello/World",
[] (const InterestFilter&, const Interest& i) {
- BOOST_CHECK(i.getLocalControlHeader().hasNextHopFaceId());
BOOST_CHECK(i.getLocalControlHeader().hasIncomingFaceId());
- BOOST_CHECK_EQUAL(i.getNextHopFaceId(), 1000);
BOOST_CHECK_EQUAL(i.getIncomingFaceId(), 2000);
},
bind([]{}),
@@ -504,10 +600,8 @@
{
face->expressInterest(Interest("/Hello/World", time::milliseconds(50)),
[&] (const Interest& i, const Data& d) {
- BOOST_CHECK(d.getLocalControlHeader().hasNextHopFaceId());
BOOST_CHECK(d.getLocalControlHeader().hasIncomingFaceId());
BOOST_CHECK_EQUAL(d.getIncomingFaceId(), 2000);
- BOOST_CHECK_EQUAL(d.getLocalControlHeader().getNextHopFaceId(), 1000);
},
bind([] {
BOOST_FAIL("Unexpected timeout");
@@ -524,6 +618,20 @@
advanceClocks(time::milliseconds(10), 100);
}
+BOOST_AUTO_TEST_CASE(PutNack)
+{
+ lp::Nack nack(Interest("/Hello/World", time::milliseconds(50)));
+ nack.setReason(lp::NackReason::NO_ROUTE);
+
+ BOOST_CHECK_EQUAL(face->sentNacks.size(), 0);
+
+ face->put(nack);
+
+ advanceClocks(time::milliseconds(10));
+
+ BOOST_CHECK_EQUAL(face->sentNacks.size(), 1);
+}
+
BOOST_AUTO_TEST_CASE(DestructionWithoutCancellingPendingInterests) // Bug #2518
{
face->expressInterest(Interest("/Hello/World", time::milliseconds(50)),
diff --git a/tests/unit-tests/lp/packet.t.cpp b/tests/unit-tests/lp/packet.t.cpp
index f532214..4150e0c 100644
--- a/tests/unit-tests/lp/packet.t.cpp
+++ b/tests/unit-tests/lp/packet.t.cpp
@@ -58,7 +58,9 @@
BOOST_AUTO_TEST_CASE(EncodeFragment)
{
static const uint8_t expectedBlock[] = {
- 0x64, 0x04, // LpPacket
+ 0x64, 0x08, // LpPacket
+ 0x51, 0x02, // Sequence
+ 0x03, 0xe8,
0x50, 0x02, // Fragment
0x03, 0xe8,
};
@@ -69,8 +71,9 @@
Packet packet;
BOOST_CHECK_NO_THROW(packet.add<FragmentField>(std::make_pair(buf.begin(), buf.end())));
+ BOOST_CHECK_NO_THROW(packet.add<SequenceField>(1000));
Block wire;
- BOOST_REQUIRE_NO_THROW(wire = packet.wireEncode());
+ BOOST_CHECK_NO_THROW(wire = packet.wireEncode());
BOOST_CHECK_EQUAL_COLLECTIONS(expectedBlock, expectedBlock + sizeof(expectedBlock),
wire.begin(), wire.end());
}
@@ -316,19 +319,9 @@
BOOST_CHECK_NO_THROW(packet.wireDecode(wire));
BOOST_CHECK_EQUAL(1, packet.count<FragmentField>());
- static const uint8_t expectedBlock[] = {
- 0x64, 0x0e, // LpPacket
- 0x50, 0x0c, // Fragment
- 0x05, 0x0a, // Interest
- 0x07, 0x02, // Name
- 0x03, 0xe8,
- 0x0a, 0x04, // Nonce
- 0x01, 0x02, 0x03, 0x04,
- };
-
Block encoded;
BOOST_CHECK_NO_THROW(encoded = packet.wireEncode());
- BOOST_CHECK_EQUAL_COLLECTIONS(expectedBlock, expectedBlock + sizeof(expectedBlock),
+ BOOST_CHECK_EQUAL_COLLECTIONS(inputBlock, inputBlock + sizeof(inputBlock),
encoded.begin(), encoded.end());
}