face: NDNLPv2 fragmentation and reassembly
refs #3171
Change-Id: If29035b697b904ee49cb86d9248be488657c6f9e
diff --git a/daemon/face/generic-link-service.cpp b/daemon/face/generic-link-service.cpp
index 23e2ce2..d6d6c02 100644
--- a/daemon/face/generic-link-service.cpp
+++ b/daemon/face/generic-link-service.cpp
@@ -30,14 +30,26 @@
NFD_LOG_INIT("GenericLinkService");
+GenericLinkServiceCounters::GenericLinkServiceCounters(const LpReassembler& reassembler)
+ : nReassembling(reassembler)
+{
+}
+
GenericLinkService::Options::Options()
: allowLocalFields(false)
+ , allowFragmentation(false)
+ , allowReassembly(false)
{
}
GenericLinkService::GenericLinkService(const GenericLinkService::Options& options)
- : m_options(options)
+ : GenericLinkServiceCounters(m_reassembler)
+ , m_options(options)
+ , m_fragmenter(m_options.fragmenterOptions, this)
+ , m_reassembler(m_options.reassemblerOptions, this)
+ , m_lastSeqNo(-2)
{
+ m_reassembler.beforeTimeout.connect(bind([this] { ++nReassemblyTimeouts; }));
}
void
@@ -47,9 +59,8 @@
if (m_options.allowLocalFields) {
encodeLocalFields(interest, lpPacket);
}
- Transport::Packet packet;
- packet.packet = lpPacket.wireEncode();
- sendPacket(std::move(packet));
+
+ sendNetPacket(std::move(lpPacket));
}
void
@@ -59,9 +70,8 @@
if (m_options.allowLocalFields) {
encodeLocalFields(data, lpPacket);
}
- Transport::Packet packet;
- packet.packet = lpPacket.wireEncode();
- sendPacket(std::move(packet));
+
+ sendNetPacket(std::move(lpPacket));
}
void
@@ -72,9 +82,8 @@
if (m_options.allowLocalFields) {
encodeLocalFields(nack.getInterest(), lpPacket);
}
- Transport::Packet packet;
- packet.packet = lpPacket.wireEncode();
- sendPacket(std::move(packet));
+
+ sendNetPacket(std::move(lpPacket));
}
bool
@@ -117,6 +126,59 @@
}
void
+GenericLinkService::sendNetPacket(lp::Packet&& pkt)
+{
+ std::vector<lp::Packet> frags;
+ const ssize_t mtu = this->getTransport()->getMtu();
+ if (m_options.allowFragmentation && mtu != MTU_UNLIMITED) {
+ bool isOk = false;
+ std::tie(isOk, frags) = m_fragmenter.fragmentPacket(pkt, mtu);
+ if (!isOk) {
+ // fragmentation failed (warning is logged by LpFragmenter)
+ ++nFragmentationErrors;
+ return;
+ }
+ }
+ else {
+ frags.push_back(pkt);
+ }
+
+ if (frags.size() > 1) {
+ // sequence is needed only if packet is fragmented
+ assignSequences(frags);
+ }
+ else {
+ // even if indexed fragmentation is enabled, the fragmenter should not
+ // fragment the packet if it can fit in MTU
+ BOOST_ASSERT(frags.size() > 0);
+ BOOST_ASSERT(!frags.front().has<lp::FragIndexField>());
+ BOOST_ASSERT(!frags.front().has<lp::FragCountField>());
+ }
+
+ for (const lp::Packet& frag : frags) {
+ Transport::Packet tp(frag.wireEncode());
+ if (mtu != MTU_UNLIMITED && tp.packet.size() > static_cast<size_t>(mtu)) {
+ ++nOutOverMtu;
+ NFD_LOG_FACE_WARN("attempt to send packet over MTU limit");
+ continue;
+ }
+ sendPacket(std::move(tp));
+ }
+}
+
+void
+GenericLinkService::assignSequence(lp::Packet& pkt)
+{
+ pkt.set<lp::SequenceField>(++m_lastSeqNo);
+}
+
+void
+GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
+{
+ std::for_each(pkts.begin(), pkts.end(), bind(&GenericLinkService::assignSequence, this, _1));
+}
+
+void
GenericLinkService::doReceivePacket(Transport::Packet&& packet)
{
try {
@@ -127,38 +189,55 @@
return;
}
- if (pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) {
- NFD_LOG_FACE_WARN("received fragment, but reassembly not implemented: DROP");
+ if ((pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) &&
+ !m_options.allowReassembly) {
+ NFD_LOG_FACE_WARN("received fragment, but reassembly disabled: DROP");
return;
}
- ndn::Buffer::const_iterator fragBegin, fragEnd;
- std::tie(fragBegin, fragEnd) = pkt.get<lp::FragmentField>();
- Block netPkt(&*fragBegin, std::distance(fragBegin, fragEnd));
+ bool isReassembled = false;
+ Block netPkt;
+ lp::Packet firstPkt;
+ std::tie(isReassembled, netPkt, firstPkt) = m_reassembler.receiveFragment(packet.remoteEndpoint,
+ pkt);
+ if (isReassembled) {
+ this->decodeNetPacket(netPkt, firstPkt);
+ }
+ }
+ catch (const tlv::Error& e) {
+ ++this->nInLpInvalid;
+ NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
+ }
+}
+void
+GenericLinkService::decodeNetPacket(const Block& netPkt, const lp::Packet& firstPkt)
+{
+ try {
switch (netPkt.type()) {
case tlv::Interest:
- if (pkt.has<lp::NackField>()) {
- this->decodeNack(netPkt, pkt);
+ if (firstPkt.has<lp::NackField>()) {
+ this->decodeNack(netPkt, firstPkt);
}
else {
- this->decodeInterest(netPkt, pkt);
+ this->decodeInterest(netPkt, firstPkt);
}
break;
case tlv::Data:
- this->decodeData(netPkt, pkt);
+ this->decodeData(netPkt, firstPkt);
break;
default:
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("unrecognized network-layer packet TLV-TYPE " << netPkt.type() << ": DROP");
return;
}
}
catch (const tlv::Error& e) {
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
}
}
-
void
GenericLinkService::decodeInterest(const Block& netPkt, const lp::Packet& firstPkt)
{
@@ -179,6 +258,7 @@
}
if (firstPkt.has<lp::CachePolicyField>()) {
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("received CachePolicy with Interest: DROP");
return;
}
@@ -199,11 +279,13 @@
auto data = make_shared<Data>(netPkt);
if (firstPkt.has<lp::NackField>()) {
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("received Nack with Data: DROP");
return;
}
if (firstPkt.has<lp::NextHopFaceIdField>()) {
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("received NextHopFaceId with Data: DROP");
return;
}
@@ -216,6 +298,7 @@
data->setCachingPolicy(ndn::nfd::LocalControlHeader::CachingPolicy::NO_CACHE);
break;
default:
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("unrecognized CachePolicyType " << policy << ": DROP");
return;
}
@@ -242,11 +325,13 @@
nack.setHeader(firstPkt.get<lp::NackField>());
if (firstPkt.has<lp::NextHopFaceIdField>()) {
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("received NextHopFaceId with Nack: DROP");
return;
}
if (firstPkt.has<lp::CachePolicyField>()) {
+ ++this->nInNetInvalid;
NFD_LOG_FACE_WARN("received CachePolicy with Nack: DROP");
return;
}