face: NDNLPv2 fragmentation and reassembly
refs #3171
Change-Id: If29035b697b904ee49cb86d9248be488657c6f9e
diff --git a/daemon/face/generic-link-service.cpp b/daemon/face/generic-link-service.cpp
index 23e2ce2..d6d6c02 100644
--- a/daemon/face/generic-link-service.cpp
+++ b/daemon/face/generic-link-service.cpp
@@ -30,14 +30,26 @@
NFD_LOG_INIT("GenericLinkService");
+GenericLinkServiceCounters::GenericLinkServiceCounters(const LpReassembler& reassembler)
+ : nReassembling(reassembler)
+{
+}
+
GenericLinkService::Options::Options()
: allowLocalFields(false)
+ , allowFragmentation(false)
+ , allowReassembly(false)
{
}
GenericLinkService::GenericLinkService(const GenericLinkService::Options& options)
- : m_options(options)
+ : GenericLinkServiceCounters(m_reassembler)
+ , m_options(options)
+ , m_fragmenter(m_options.fragmenterOptions, this)
+ , m_reassembler(m_options.reassemblerOptions, this)
+ , m_lastSeqNo(-2)
{
+ m_reassembler.beforeTimeout.connect(bind([this] { ++nReassemblyTimeouts; }));
}
void
@@ -47,9 +59,8 @@
if (m_options.allowLocalFields) {
encodeLocalFields(interest, lpPacket);
}
- Transport::Packet packet;
- packet.packet = lpPacket.wireEncode();
- sendPacket(std::move(packet));
+
+ sendNetPacket(std::move(lpPacket));
}
void
@@ -59,9 +70,8 @@
if (m_options.allowLocalFields) {
encodeLocalFields(data, lpPacket);
}
- Transport::Packet packet;
- packet.packet = lpPacket.wireEncode();
- sendPacket(std::move(packet));
+
+ sendNetPacket(std::move(lpPacket));
}
void
@@ -72,9 +82,8 @@
if (m_options.allowLocalFields) {
encodeLocalFields(nack.getInterest(), lpPacket);
}
- Transport::Packet packet;
- packet.packet = lpPacket.wireEncode();
- sendPacket(std::move(packet));
+
+ sendNetPacket(std::move(lpPacket));
}
bool
@@ -117,6 +126,59 @@
}
void
+GenericLinkService::sendNetPacket(lp::Packet&& pkt)
+{
+ std::vector<lp::Packet> frags;
+ const ssize_t mtu = this->getTransport()->getMtu();
+ if (m_options.allowFragmentation && mtu != MTU_UNLIMITED) {
+ bool isOk = false;
+ std::tie(isOk, frags) = m_fragmenter.fragmentPacket(pkt, mtu);
+ if (!isOk) {
+ // fragmentation failed (warning is logged by LpFragmenter)
+ ++nFragmentationErrors;
+ return;
+ }
+ }
+ else {
+ frags.push_back(pkt);
+ }
+
+ if (frags.size() > 1) {
+ // sequence is needed only if packet is fragmented
+ assignSequences(frags);
+ }
+ else {
+ // even if indexed fragmentation is enabled, the fragmenter should not
+ // fragment the packet if it can fit in MTU
+ BOOST_ASSERT(frags.size() > 0);
+ BOOST_ASSERT(!frags.front().has<lp::FragIndexField>());
+ BOOST_ASSERT(!frags.front().has<lp::FragCountField>());
+ }
+
+ for (const lp::Packet& frag : frags) {
+ Transport::Packet tp(frag.wireEncode());
+ if (mtu != MTU_UNLIMITED && tp.packet.size() > static_cast<size_t>(mtu)) {
+ ++nOutOverMtu;
+ NFD_LOG_FACE_WARN("attempt to send packet over MTU limit");
+ continue;
+ }
+ sendPacket(std::move(tp));
+ }
+}
+
+void
+GenericLinkService::assignSequence(lp::Packet& pkt)
+{
+ pkt.set<lp::SequenceField>(++m_lastSeqNo);
+}
+
+void
+GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
+{
+ std::for_each(pkts.begin(), pkts.end(), bind(&GenericLinkService::assignSequence, this, _1));
+}
+
+void
GenericLinkService::doReceivePacket(Transport::Packet&& packet)
{
try {
@@ -127,38 +189,55 @@
return;
}
- if (pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) {
- NFD_LOG_FACE_WARN("received fragment, but reassembly not implemented: DROP");
+ if ((pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) &&
+ !m_options.allowReassembly) {
+ NFD_LOG_FACE_WARN("received fragment, but reassembly disabled: DROP");
return;
}
- ndn::Buffer::const_iterator fragBegin, fragEnd;
- std::tie(fragBegin, fragEnd) = pkt.get<lp::FragmentField>();
- Block netPkt(&*fragBegin, std::distance(fragBegin, fragEnd));
+ bool isReassembled = false;
+ Block netPkt;
+ lp::Packet firstPkt;
+ std::tie(isReassembled, netPkt, firstPkt) = m_reassembler.receiveFragment(packet.remoteEndpoint,
+ pkt);
+ if (isReassembled) {
+ this->decodeNetPacket(netPkt, firstPkt);
+ }
+ }
+ catch (const tlv::Error& e) {
+ ++this->nInLpInvalid;
+ NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
+ }
+}
+void
+GenericLinkService::decodeNetPacket(const Block& netPkt, const lp::Packet& firstPkt)
+{
+ try {
switch (netPkt.type()) {
case tlv::Interest:
- if (pkt.has<lp::NackField>()) {
- this->decodeNack(netPkt, pkt);
+ if (firstPkt.has<lp::NackField>()) {
+ this->decodeNack(netPkt, firstPkt);
}
else {
- this->decodeInterest(netPkt, pkt);
+ this->decodeInterest(netPkt, firstPkt);
}
break;
case tlv::Data:
- this->decodeData(netPkt, pkt);
+ this->decodeData(netPkt, firstPkt);
break;
default:
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("unrecognized network-layer packet TLV-TYPE " << netPkt.type() << ": DROP");
return;
}
}
catch (const tlv::Error& e) {
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
}
}
-
void
GenericLinkService::decodeInterest(const Block& netPkt, const lp::Packet& firstPkt)
{
@@ -179,6 +258,7 @@
}
if (firstPkt.has<lp::CachePolicyField>()) {
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("received CachePolicy with Interest: DROP");
return;
}
@@ -199,11 +279,13 @@
auto data = make_shared<Data>(netPkt);
if (firstPkt.has<lp::NackField>()) {
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("received Nack with Data: DROP");
return;
}
if (firstPkt.has<lp::NextHopFaceIdField>()) {
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("received NextHopFaceId with Data: DROP");
return;
}
@@ -216,6 +298,7 @@
data->setCachingPolicy(ndn::nfd::LocalControlHeader::CachingPolicy::NO_CACHE);
break;
default:
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("unrecognized CachePolicyType " << policy << ": DROP");
return;
}
@@ -242,11 +325,13 @@
nack.setHeader(firstPkt.get<lp::NackField>());
if (firstPkt.has<lp::NextHopFaceIdField>()) {
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("received NextHopFaceId with Nack: DROP");
return;
}
if (firstPkt.has<lp::CachePolicyField>()) {
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("received CachePolicy with Nack: DROP");
return;
}
diff --git a/daemon/face/generic-link-service.hpp b/daemon/face/generic-link-service.hpp
index ae3328f..d7cc22f 100644
--- a/daemon/face/generic-link-service.hpp
+++ b/daemon/face/generic-link-service.hpp
@@ -30,16 +30,54 @@
#include "core/logger.hpp"
#include "link-service.hpp"
-
-#include <ndn-cxx/lp/packet.hpp>
+#include "lp-fragmenter.hpp"
+#include "lp-reassembler.hpp"
namespace nfd {
namespace face {
+/** \brief counters provided by GenericLinkService
+ * \note The type name 'GenericLinkServiceCounters' is implementation detail.
+ * Use 'GenericLinkService::Counters' in public API.
+ */
+class GenericLinkServiceCounters : public virtual LinkService::Counters
+{
+public:
+ explicit
+ GenericLinkServiceCounters(const LpReassembler& reassembler);
+
+ /** \brief count of failed fragmentations
+ */
+ PacketCounter nFragmentationErrors;
+
+ /** \brief count of outgoing LpPackets dropped due to exceeding MTU limit
+ *
+ * If this counter is non-zero, the operator should enable fragmentation.
+ */
+ PacketCounter nOutOverMtu;
+
+ /** \brief count of invalid LpPackets dropped before reassembly
+ */
+ PacketCounter nInLpInvalid;
+
+ /** \brief count of network-layer packets currently being reassembled
+ */
+ SizeCounter<LpReassembler> nReassembling;
+
+ /** \brief count of dropped partial network-layer packets due to reassembly timeout
+ */
+ PacketCounter nReassemblyTimeouts;
+
+ /** \brief count of invalid reassembled network-layer packets dropped
+ */
+ PacketCounter nInNetInvalid;
+};
+
/** \brief GenericLinkService is a LinkService that implements the NDNLPv2 protocol
* \sa http://redmine.named-data.net/projects/nfd/wiki/NDNLPv2
*/
class GenericLinkService : public LinkService
+ , protected virtual GenericLinkServiceCounters
{
public:
/** \brief Options that control the behavior of GenericLinkService
@@ -50,13 +88,31 @@
Options();
public:
- // TODO #3171: fragmentation and reassembly options
-
/** \brief enables encoding of IncomingFaceId, and decoding of NextHopFaceId and CachePolicy
*/
bool allowLocalFields;
+
+ /** \brief enables fragmentation
+ */
+ bool allowFragmentation;
+
+ /** \brief options for fragmentation
+ */
+ LpFragmenter::Options fragmenterOptions;
+
+ /** \brief enables reassembly
+ */
+ bool allowReassembly;
+
+ /** \brief options for reassembly
+ */
+ LpReassembler::Options reassemblerOptions;
};
+ /** \brief counters provided by GenericLinkService
+ */
+ typedef GenericLinkServiceCounters Counters;
+
explicit
GenericLinkService(const Options& options = Options());
@@ -70,7 +126,10 @@
void
setOptions(const Options& options);
-private: // send path entrypoint
+ virtual const Counters&
+ getCounters() const DECL_OVERRIDE;
+
+private: // send path
/** \brief sends Interest
*/
void
@@ -87,13 +146,6 @@
void
doSendNack(const ndn::lp::Nack& nack) DECL_OVERRIDE;
-private: // receive path entrypoint
- /** \brief receives Packet
- */
- void
- doReceivePacket(Transport::Packet&& packet) DECL_OVERRIDE;
-
-private: // encoding and decoding
/** \brief encode IncomingFaceId into LpPacket and verify local fields
*/
static bool
@@ -104,6 +156,38 @@
static bool
encodeLocalFields(const Data& data, lp::Packet& lpPacket);
+ /** \brief encode and send a complete network layer packet
+ * \param pkt LpPacket containing a complete network layer packet
+ */
+ void
+ sendNetPacket(lp::Packet&& pkt);
+
+ /** \brief assign a sequence number to an LpPacket
+ */
+ void
+ assignSequence(lp::Packet& pkt);
+
+ /** \brief assign consecutive sequence numbers to LpPackets
+ */
+ void
+ assignSequences(std::vector<lp::Packet>& pkts);
+
+private: // receive path
+ /** \brief receive Packet from Transport
+ */
+ void
+ doReceivePacket(Transport::Packet&& packet) DECL_OVERRIDE;
+
+ /** \brief decode incoming network-layer packet
+ * \param netPkt reassembled network-layer packet
+ * \param firstPkt LpPacket of first fragment
+ *
+ * If decoding is successful, a receive signal is emitted;
+ * otherwise, a warning is logged.
+ */
+ void
+ decodeNetPacket(const Block& netPkt, const lp::Packet& firstPkt);
+
/** \brief decode incoming Interest
* \param netPkt reassembled network-layer packet; TLV-TYPE must be Interest
* \param firstPkt LpPacket of first fragment; must not have Nack field
@@ -142,6 +226,9 @@
private:
Options m_options;
+ LpFragmenter m_fragmenter;
+ LpReassembler m_reassembler;
+ lp::Sequence m_lastSeqNo;
};
inline const GenericLinkService::Options&
@@ -156,6 +243,12 @@
m_options = options;
}
+inline const GenericLinkService::Counters&
+GenericLinkService::getCounters() const
+{
+ return *this;
+}
+
} // namespace face
} // namespace nfd
diff --git a/daemon/face/lp-fragmenter.cpp b/daemon/face/lp-fragmenter.cpp
new file mode 100644
index 0000000..b54f7d5
--- /dev/null
+++ b/daemon/face/lp-fragmenter.cpp
@@ -0,0 +1,162 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014-2015, Regents of the University of California,
+ * Arizona Board of Regents,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University,
+ * Washington University in St. Louis,
+ * Beijing Institute of Technology,
+ * The University of Memphis.
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "lp-fragmenter.hpp"
+#include "link-service.hpp"
+#include <ndn-cxx/encoding/tlv.hpp>
+
+namespace nfd {
+namespace face {
+
+NFD_LOG_INIT("LpFragmenter");
+
+static_assert(lp::tlv::LpPacket < 253, "LpPacket TLV-TYPE must fit in 1 octet");
+static_assert(lp::tlv::Sequence < 253, "Sequence TLV-TYPE must fit in 1 octet");
+static_assert(lp::tlv::FragIndex < 253, "FragIndex TLV-TYPE must fit in 1 octet");
+static_assert(lp::tlv::FragCount < 253, "FragCount TLV-TYPE must fit in 1 octet");
+static_assert(lp::tlv::Fragment < 253, "Fragment TLV-TYPE must fit in 1 octet");
+
+/** \brief maximum overhead on a single fragment,
+ * not counting other NDNLPv2 headers
+ */
+static const size_t MAX_SINGLE_FRAG_OVERHEAD =
+ 1 + 9 + // LpPacket TLV-TYPE and TLV-LENGTH
+ 1 + 1 + 8 + // Sequence TLV
+ 1 + 9; // Fragment TLV-TYPE and TLV-LENGTH
+
+/** \brief maximum overhead of adding fragmentation to payload,
+ * not counting other NDNLPv2 headers
+ */
+static const size_t MAX_FRAG_OVERHEAD =
+ 1 + 9 + // LpPacket TLV-TYPE and TLV-LENGTH
+ 1 + 1 + 8 + // Sequence TLV
+ 1 + 1 + 8 + // FragIndex TLV
+ 1 + 1 + 8 + // FragCount TLV
+ 1 + 9; // Fragment TLV-TYPE and TLV-LENGTH
+
+LpFragmenter::Options::Options()
+ : nMaxFragments(400)
+{
+}
+
+LpFragmenter::LpFragmenter(const LpFragmenter::Options& options, const LinkService* linkService)
+ : m_options(options)
+ , m_linkService(linkService)
+{
+}
+
+void
+LpFragmenter::setOptions(const Options& options)
+{
+ m_options = options;
+}
+
+const LinkService*
+LpFragmenter::getLinkService() const
+{
+ return m_linkService;
+}
+
+std::tuple<bool, std::vector<lp::Packet>>
+LpFragmenter::fragmentPacket(const lp::Packet& packet, size_t mtu)
+{
+ BOOST_ASSERT(packet.has<lp::FragmentField>());
+ BOOST_ASSERT(!packet.has<lp::FragIndexField>());
+ BOOST_ASSERT(!packet.has<lp::FragCountField>());
+
+ if (MAX_SINGLE_FRAG_OVERHEAD + packet.wireEncode().size() <= mtu) {
+ // fast path: fragmentation not needed
+ // To qualify for fast path, the packet must have space for adding a sequence number,
+ // because another NDNLPv2 feature may require the sequence number.
+ return std::make_tuple(true, std::vector<lp::Packet>{packet});
+ }
+
+ ndn::Buffer::const_iterator netPktBegin, netPktEnd;
+ std::tie(netPktBegin, netPktEnd) = packet.get<lp::FragmentField>();
+ size_t netPktSize = std::distance(netPktBegin, netPktEnd);
+
+ // compute size of other NDNLPv2 headers to be placed on the first fragment
+ size_t firstHeaderSize = 0;
+ const Block& packetWire = packet.wireEncode();
+ if (packetWire.type() == lp::tlv::LpPacket) {
+ for (const Block& element : packetWire.elements()) {
+ if (element.type() != lp::tlv::Fragment) {
+ firstHeaderSize += element.size();
+ }
+ }
+ }
+
+ // compute payload size
+ if (MAX_FRAG_OVERHEAD + firstHeaderSize + 1 > mtu) { // 1-octet fragment
+ NFD_LOG_FACE_WARN("fragmentation error, MTU too small for first fragment: DROP");
+ return std::make_tuple(false, std::vector<lp::Packet>{});
+ }
+ size_t firstPayloadSize = std::min(netPktSize, mtu - firstHeaderSize - MAX_FRAG_OVERHEAD);
+ size_t payloadSize = mtu - MAX_FRAG_OVERHEAD;
+ size_t fragCount = 1 + ((netPktSize - firstPayloadSize) / payloadSize) +
+ ((netPktSize - firstPayloadSize) % payloadSize != 0);
+
+ // compute FragCount
+ if (fragCount > m_options.nMaxFragments) {
+ NFD_LOG_FACE_WARN("fragmentation error, FragCount over limit: DROP");
+ return std::make_pair(false, std::vector<lp::Packet>{});
+ }
+
+ // populate fragments
+ std::vector<lp::Packet> frags(fragCount);
+ frags.front() = packet; // copy input packet to preserve other NDNLPv2 fields
+ size_t fragIndex = 0;
+ auto fragBegin = netPktBegin,
+ fragEnd = fragBegin + firstPayloadSize;
+ while (fragBegin < netPktEnd) {
+ lp::Packet& frag = frags[fragIndex];
+ frag.add<lp::FragIndexField>(fragIndex);
+ frag.add<lp::FragCountField>(fragCount);
+ frag.set<lp::FragmentField>(std::make_pair(fragBegin, fragEnd));
+ BOOST_ASSERT(frag.wireEncode().size() <= mtu);
+
+ ++fragIndex;
+ fragBegin = fragEnd;
+ fragEnd = std::min(netPktEnd, fragBegin + payloadSize);
+ }
+ BOOST_ASSERT(fragIndex == fragCount);
+
+ return std::make_pair(true, frags);
+}
+
+std::ostream&
+operator<<(std::ostream& os, const FaceLogHelper<LpFragmenter>& flh)
+{
+ if (flh.obj.getLinkService() == nullptr) {
+ os << "[id=0,local=unknown,remote=unknown] ";
+ }
+ else {
+ os << FaceLogHelper<LinkService>(*flh.obj.getLinkService());
+ }
+ return os;
+}
+
+} // namespace face
+} // namespace nfd
diff --git a/daemon/face/lp-fragmenter.hpp b/daemon/face/lp-fragmenter.hpp
new file mode 100644
index 0000000..8275d0f
--- /dev/null
+++ b/daemon/face/lp-fragmenter.hpp
@@ -0,0 +1,92 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014-2015, Regents of the University of California,
+ * Arizona Board of Regents,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University,
+ * Washington University in St. Louis,
+ * Beijing Institute of Technology,
+ * The University of Memphis.
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef NFD_DAEMON_FACE_LP_FRAGMENTER_HPP
+#define NFD_DAEMON_FACE_LP_FRAGMENTER_HPP
+
+#include "face-log.hpp"
+
+#include <ndn-cxx/lp/packet.hpp>
+
+namespace nfd {
+namespace face {
+
+class LinkService;
+
+/** \brief fragments network-layer packets into NDNLPv2 link-layer packets
+ * \sa http://redmine.named-data.net/projects/nfd/wiki/NDNLPv2
+ */
+class LpFragmenter
+{
+public:
+ /** \brief Options that control the behavior of LpFragmenter
+ */
+ class Options
+ {
+ public:
+ Options();
+
+ public:
+ /** \brief maximum number of fragments in a packet
+ */
+ size_t nMaxFragments;
+ };
+
+ explicit
+ LpFragmenter(const Options& options = Options(), const LinkService* linkService = nullptr);
+
+ /** \brief set options for fragmenter
+ */
+ void
+ setOptions(const Options& options);
+
+ /** \return LinkService that owns this instance
+ *
+ * This is only used for logging, and may be nullptr.
+ */
+ const LinkService*
+ getLinkService() const;
+
+ /** \brief fragments a network-layer packet into link-layer packets
+ * \param packet an LpPacket that contains a network-layer packet;
+ * must have Fragment field, must not have FragIndex and FragCount fields
+ * \param mtu maximum allowable LpPacket size after fragmentation and sequence number assignment
+ * \return whether fragmentation succeeded, fragmented packets without sequence number
+ */
+ std::tuple<bool, std::vector<lp::Packet>>
+ fragmentPacket(const lp::Packet& packet, size_t mtu);
+
+private:
+ Options m_options;
+ const LinkService* m_linkService;
+};
+
+std::ostream&
+operator<<(std::ostream& os, const FaceLogHelper<LpFragmenter>& flh);
+
+} // namespace face
+} // namespace nfd
+
+#endif // NFD_DAEMON_FACE_LP_FRAGMENTER_HPP
diff --git a/daemon/face/lp-reassembler.cpp b/daemon/face/lp-reassembler.cpp
new file mode 100644
index 0000000..88ef1c1
--- /dev/null
+++ b/daemon/face/lp-reassembler.cpp
@@ -0,0 +1,176 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014-2015, Regents of the University of California,
+ * Arizona Board of Regents,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University,
+ * Washington University in St. Louis,
+ * Beijing Institute of Technology,
+ * The University of Memphis.
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "lp-reassembler.hpp"
+#include "link-service.hpp"
+#include <numeric>
+
+namespace nfd {
+namespace face {
+
+NFD_LOG_INIT("LpReassembler");
+
+LpReassembler::Options::Options()
+ : nMaxFragments(400)
+ , reassemblyTimeout(time::milliseconds(500))
+{
+}
+
+LpReassembler::LpReassembler(const LpReassembler::Options& options, const LinkService* linkService)
+ : m_options(options)
+ , m_linkService(linkService)
+{
+}
+
+std::tuple<bool, Block, lp::Packet>
+LpReassembler::receiveFragment(Transport::EndpointId remoteEndpoint, const lp::Packet& packet)
+{
+ BOOST_ASSERT(packet.has<lp::FragmentField>());
+
+ static auto FALSE_RETURN = std::make_tuple(false, Block(), lp::Packet());
+
+ // read and check FragIndex and FragCount
+ uint64_t fragIndex = 0;
+ uint64_t fragCount = 1;
+ if (packet.has<lp::FragIndexField>()) {
+ fragIndex = packet.get<lp::FragIndexField>();
+ }
+ if (packet.has<lp::FragCountField>()) {
+ fragCount = packet.get<lp::FragCountField>();
+ }
+
+ if (fragIndex >= fragCount) {
+ NFD_LOG_FACE_WARN("reassembly error, FragIndex>=FragCount: DROP");
+ return FALSE_RETURN;
+ }
+
+ if (fragCount > m_options.nMaxFragments) {
+ NFD_LOG_FACE_WARN("reassembly error, FragCount over limit: DROP");
+ return FALSE_RETURN;
+ }
+
+ // check for fast path
+ if (fragIndex == 0 && fragCount == 1) {
+ ndn::Buffer::const_iterator fragBegin, fragEnd;
+ std::tie(fragBegin, fragEnd) = packet.get<lp::FragmentField>();
+ Block netPkt(&*fragBegin, std::distance(fragBegin, fragEnd));
+ return std::make_tuple(true, netPkt, packet);
+ }
+
+ // check Sequence and compute message identifier
+ if (!packet.has<lp::SequenceField>()) {
+ NFD_LOG_FACE_WARN("reassembly error, Sequence missing: DROP");
+ return FALSE_RETURN;
+ }
+ lp::Sequence messageIdentifier = packet.get<lp::SequenceField>() - fragIndex;
+ Key key = std::make_tuple(remoteEndpoint, messageIdentifier);
+
+ // add to PartialPacket
+ PartialPacket& pp = m_partialPackets[key];
+ if (pp.fragCount == 0) { // new PartialPacket
+ pp.fragCount = fragCount;
+ pp.nReceivedFragments = 0;
+ pp.fragments.resize(fragCount);
+ }
+ else {
+ if (fragCount != pp.fragCount) {
+ NFD_LOG_FACE_WARN("reassembly error, FragCount changed: DROP");
+ return FALSE_RETURN;
+ }
+ }
+
+ if (pp.fragments[fragIndex].has<lp::SequenceField>()) {
+ NFD_LOG_FACE_TRACE("fragment already received: DROP");
+ return FALSE_RETURN;
+ }
+
+ pp.fragments[fragIndex] = packet;
+ ++pp.nReceivedFragments;
+
+ // check complete condition
+ if (pp.nReceivedFragments == pp.fragCount) {
+ Block reassembled = doReassembly(key);
+ lp::Packet firstFrag(std::move(pp.fragments[0]));
+ m_partialPackets.erase(key);
+ return std::make_tuple(true, reassembled, firstFrag);
+ }
+
+ // set drop timer
+ pp.dropTimer = scheduler::schedule(m_options.reassemblyTimeout,
+ bind(&LpReassembler::timeoutPartialPacket, this, key));
+
+ return FALSE_RETURN;
+}
+
+Block
+LpReassembler::doReassembly(const Key& key)
+{
+ PartialPacket& pp = m_partialPackets[key];
+
+ size_t payloadSize = std::accumulate(pp.fragments.begin(), pp.fragments.end(), 0,
+ [&] (size_t sum, const lp::Packet& pkt) -> size_t {
+ ndn::Buffer::const_iterator fragBegin, fragEnd;
+ std::tie(fragBegin, fragEnd) = pkt.get<lp::FragmentField>();
+ return sum + std::distance(fragBegin, fragEnd);
+ });
+
+ ndn::Buffer fragBuffer(payloadSize);
+ ndn::Buffer::iterator it = fragBuffer.begin();
+
+ for (const lp::Packet& frag : pp.fragments) {
+ ndn::Buffer::const_iterator fragBegin, fragEnd;
+ std::tie(fragBegin, fragEnd) = frag.get<lp::FragmentField>();
+ it = std::copy(fragBegin, fragEnd, it);
+ }
+
+ return Block(&*(fragBuffer.cbegin()), std::distance(fragBuffer.cbegin(), fragBuffer.cend()));
+}
+
+void
+LpReassembler::timeoutPartialPacket(const Key& key)
+{
+ auto it = m_partialPackets.find(key);
+ if (it == m_partialPackets.end()) {
+ return;
+ }
+
+ this->beforeTimeout(std::get<0>(key), it->second.nReceivedFragments);
+ m_partialPackets.erase(it);
+}
+
+std::ostream&
+operator<<(std::ostream& os, const FaceLogHelper<LpReassembler>& flh)
+{
+ if (flh.obj.getLinkService() == nullptr) {
+ os << "[id=0,local=unknown,remote=unknown] ";
+ }
+ else {
+ os << FaceLogHelper<LinkService>(*flh.obj.getLinkService());
+ }
+ return os;
+}
+
+} // namespace face
+} // namespace nfd
diff --git a/daemon/face/lp-reassembler.hpp b/daemon/face/lp-reassembler.hpp
new file mode 100644
index 0000000..fd0d874
--- /dev/null
+++ b/daemon/face/lp-reassembler.hpp
@@ -0,0 +1,160 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014-2015, Regents of the University of California,
+ * Arizona Board of Regents,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University,
+ * Washington University in St. Louis,
+ * Beijing Institute of Technology,
+ * The University of Memphis.
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef NFD_DAEMON_FACE_LP_REASSEMBLER_HPP
+#define NFD_DAEMON_FACE_LP_REASSEMBLER_HPP
+
+#include "core/scheduler.hpp"
+#include "face-log.hpp"
+#include "transport.hpp"
+
+#include <ndn-cxx/lp/packet.hpp>
+
+namespace nfd {
+namespace face {
+
+class LinkService;
+
+/** \brief reassembles fragmented network-layer packets
+ * \sa http://redmine.named-data.net/projects/nfd/wiki/NDNLPv2
+ */
+class LpReassembler : noncopyable
+{
+public:
+ /** \brief Options that control the behavior of LpReassembler
+ */
+ class Options
+ {
+ public:
+ Options();
+
+ public:
+ /** \brief maximum number of fragments in a packet
+ *
+ * LpPackets with FragCount over this limit are dropped.
+ */
+ size_t nMaxFragments;
+
+ /** \brief timeout before a partially reassembled packet is dropped
+ */
+ time::nanoseconds reassemblyTimeout;
+ };
+
+ explicit
+ LpReassembler(const Options& options = Options(), const LinkService* linkService = nullptr);
+
+ /** \brief set options for reassembler
+ */
+ void
+ setOptions(const Options& options);
+
+ /** \return LinkService that owns this instance
+ *
+ * This is only used for logging, and may be nullptr.
+ */
+ const LinkService*
+ getLinkService() const;
+
+ /** \brief adds received fragment to buffer
+ * \param remoteEndpoint endpoint whose sends the packet
+ * \param packet received fragment;
+ * must have Fragment field
+ * \return whether network-layer packet has been completely received,
+ * the reassembled network-layer packet,
+ * and the first fragment for inspecting other NDNLPv2 headers
+ * \throw tlv::Error packet is malformed
+ */
+ std::tuple<bool, Block, lp::Packet>
+ receiveFragment(Transport::EndpointId remoteEndpoint, const lp::Packet& packet);
+
+ /** \brief count of partial packets
+ */
+ size_t
+ size() const;
+
+ /** \brief signals before a partial packet is dropped due to timeout
+ *
+ * If a partial packet is incomplete and no new fragment is received
+ * within Options::reassemblyTimeout, it would be dropped due to timeout.
+ * Before it's erased, this signal is emitted with the remote endpoint,
+ * and the number of fragments being dropped.
+ */
+ signal::Signal<LpReassembler, Transport::EndpointId, size_t> beforeTimeout;
+
+private:
+ /** \brief holds all fragments of packet until reassembled
+ */
+ struct PartialPacket
+ {
+ std::vector<lp::Packet> fragments;
+ size_t fragCount; ///< total fragments
+ size_t nReceivedFragments; ///< number of received fragments
+ scheduler::ScopedEventId dropTimer;
+ };
+
+ /** \brief index key for PartialPackets
+ */
+ typedef std::tuple<
+ Transport::EndpointId, // remoteEndpoint
+ lp::Sequence // message identifier (sequence of the first fragment)
+ > Key;
+
+ Block
+ doReassembly(const Key& key);
+
+ void
+ timeoutPartialPacket(const Key& key);
+
+private:
+ Options m_options;
+ std::map<Key, PartialPacket> m_partialPackets;
+ const LinkService* m_linkService;
+};
+
+std::ostream&
+operator<<(std::ostream& os, const FaceLogHelper<LpReassembler>& flh);
+
+inline void
+LpReassembler::setOptions(const Options& options)
+{
+ m_options = options;
+}
+
+inline const LinkService*
+LpReassembler::getLinkService() const
+{
+ return m_linkService;
+}
+
+inline size_t
+LpReassembler::size() const
+{
+ return m_partialPackets.size();
+}
+
+} // namespace face
+} // namespace nfd
+
+#endif // NFD_DAEMON_FACE_LP_REASSEMBLER_HPP