face: NDNLPv2 fragmentation and reassembly

refs #3171

Change-Id: If29035b697b904ee49cb86d9248be488657c6f9e
diff --git a/daemon/face/generic-link-service.cpp b/daemon/face/generic-link-service.cpp
index 23e2ce2..d6d6c02 100644
--- a/daemon/face/generic-link-service.cpp
+++ b/daemon/face/generic-link-service.cpp
@@ -30,14 +30,26 @@
 
 NFD_LOG_INIT("GenericLinkService");
 
+GenericLinkServiceCounters::GenericLinkServiceCounters(const LpReassembler& reassembler)
+  : nReassembling(reassembler)
+{
+}
+
 GenericLinkService::Options::Options()
   : allowLocalFields(false)
+  , allowFragmentation(false)
+  , allowReassembly(false)
 {
 }
 
 GenericLinkService::GenericLinkService(const GenericLinkService::Options& options)
-  : m_options(options)
+  : GenericLinkServiceCounters(m_reassembler)
+  , m_options(options)
+  , m_fragmenter(m_options.fragmenterOptions, this)
+  , m_reassembler(m_options.reassemblerOptions, this)
+  , m_lastSeqNo(-2)
 {
+  m_reassembler.beforeTimeout.connect(bind([this] { ++nReassemblyTimeouts; }));
 }
 
 void
@@ -47,9 +59,8 @@
   if (m_options.allowLocalFields) {
     encodeLocalFields(interest, lpPacket);
   }
-  Transport::Packet packet;
-  packet.packet = lpPacket.wireEncode();
-  sendPacket(std::move(packet));
+
+  sendNetPacket(std::move(lpPacket));
 }
 
 void
@@ -59,9 +70,8 @@
   if (m_options.allowLocalFields) {
     encodeLocalFields(data, lpPacket);
   }
-  Transport::Packet packet;
-  packet.packet = lpPacket.wireEncode();
-  sendPacket(std::move(packet));
+
+  sendNetPacket(std::move(lpPacket));
 }
 
 void
@@ -72,9 +82,8 @@
   if (m_options.allowLocalFields) {
     encodeLocalFields(nack.getInterest(), lpPacket);
   }
-  Transport::Packet packet;
-  packet.packet = lpPacket.wireEncode();
-  sendPacket(std::move(packet));
+
+  sendNetPacket(std::move(lpPacket));
 }
 
 bool
@@ -117,6 +126,59 @@
 }
 
 void
+GenericLinkService::sendNetPacket(lp::Packet&& pkt)
+{
+  std::vector<lp::Packet> frags;
+  const ssize_t mtu = this->getTransport()->getMtu();
+  if (m_options.allowFragmentation && mtu != MTU_UNLIMITED) {
+    bool isOk = false;
+    std::tie(isOk, frags) = m_fragmenter.fragmentPacket(pkt, mtu);
+    if (!isOk) {
+      // fragmentation failed (warning is logged by LpFragmenter)
+      ++nFragmentationErrors;
+      return;
+    }
+  }
+  else {
+    frags.push_back(pkt);
+  }
+
+  if (frags.size() > 1) {
+    // sequence is needed only if packet is fragmented
+    assignSequences(frags);
+  }
+  else {
+    // even if indexed fragmentation is enabled, the fragmenter should not
+    // fragment the packet if it can fit in MTU
+    BOOST_ASSERT(frags.size() > 0);
+    BOOST_ASSERT(!frags.front().has<lp::FragIndexField>());
+    BOOST_ASSERT(!frags.front().has<lp::FragCountField>());
+  }
+
+  for (const lp::Packet& frag : frags) {
+    Transport::Packet tp(frag.wireEncode());
+    if (mtu != MTU_UNLIMITED && tp.packet.size() > static_cast<size_t>(mtu)) {
+      ++nOutOverMtu;
+      NFD_LOG_FACE_WARN("attempt to send packet over MTU limit");
+      continue;
+    }
+    sendPacket(std::move(tp));
+  }
+}
+
+void
+GenericLinkService::assignSequence(lp::Packet& pkt)
+{
+  pkt.set<lp::SequenceField>(++m_lastSeqNo);
+}
+
+void
+GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
+{
+  std::for_each(pkts.begin(), pkts.end(), bind(&GenericLinkService::assignSequence, this, _1));
+}
+
+void
 GenericLinkService::doReceivePacket(Transport::Packet&& packet)
 {
   try {
@@ -127,38 +189,55 @@
       return;
     }
 
-    if (pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) {
-      NFD_LOG_FACE_WARN("received fragment, but reassembly not implemented: DROP");
+    if ((pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) &&
+        !m_options.allowReassembly) {
+      NFD_LOG_FACE_WARN("received fragment, but reassembly disabled: DROP");
       return;
     }
 
-    ndn::Buffer::const_iterator fragBegin, fragEnd;
-    std::tie(fragBegin, fragEnd) = pkt.get<lp::FragmentField>();
-    Block netPkt(&*fragBegin, std::distance(fragBegin, fragEnd));
+    bool isReassembled = false;
+    Block netPkt;
+    lp::Packet firstPkt;
+    std::tie(isReassembled, netPkt, firstPkt) = m_reassembler.receiveFragment(packet.remoteEndpoint,
+                                                                              pkt);
+    if (isReassembled) {
+      this->decodeNetPacket(netPkt, firstPkt);
+    }
+  }
+  catch (const tlv::Error& e) {
+    ++this->nInLpInvalid;
+    NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
+  }
+}
 
+void
+GenericLinkService::decodeNetPacket(const Block& netPkt, const lp::Packet& firstPkt)
+{
+  try {
     switch (netPkt.type()) {
       case tlv::Interest:
-        if (pkt.has<lp::NackField>()) {
-          this->decodeNack(netPkt, pkt);
+        if (firstPkt.has<lp::NackField>()) {
+          this->decodeNack(netPkt, firstPkt);
         }
         else {
-          this->decodeInterest(netPkt, pkt);
+          this->decodeInterest(netPkt, firstPkt);
         }
         break;
       case tlv::Data:
-        this->decodeData(netPkt, pkt);
+        this->decodeData(netPkt, firstPkt);
         break;
       default:
+        ++this->nInNetInvalid;
         NFD_LOG_FACE_WARN("unrecognized network-layer packet TLV-TYPE " << netPkt.type() << ": DROP");
         return;
     }
   }
   catch (const tlv::Error& e) {
+    ++this->nInNetInvalid;
     NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
   }
 }
 
-
 void
 GenericLinkService::decodeInterest(const Block& netPkt, const lp::Packet& firstPkt)
 {
@@ -179,6 +258,7 @@
   }
 
   if (firstPkt.has<lp::CachePolicyField>()) {
+    ++this->nInNetInvalid;
     NFD_LOG_FACE_WARN("received CachePolicy with Interest: DROP");
     return;
   }
@@ -199,11 +279,13 @@
   auto data = make_shared<Data>(netPkt);
 
   if (firstPkt.has<lp::NackField>()) {
+    ++this->nInNetInvalid;
     NFD_LOG_FACE_WARN("received Nack with Data: DROP");
     return;
   }
 
   if (firstPkt.has<lp::NextHopFaceIdField>()) {
+    ++this->nInNetInvalid;
     NFD_LOG_FACE_WARN("received NextHopFaceId with Data: DROP");
     return;
   }
@@ -216,6 +298,7 @@
           data->setCachingPolicy(ndn::nfd::LocalControlHeader::CachingPolicy::NO_CACHE);
           break;
         default:
+          ++this->nInNetInvalid;
           NFD_LOG_FACE_WARN("unrecognized CachePolicyType " << policy << ": DROP");
           return;
       }
@@ -242,11 +325,13 @@
   nack.setHeader(firstPkt.get<lp::NackField>());
 
   if (firstPkt.has<lp::NextHopFaceIdField>()) {
+    ++this->nInNetInvalid;
     NFD_LOG_FACE_WARN("received NextHopFaceId with Nack: DROP");
     return;
   }
 
   if (firstPkt.has<lp::CachePolicyField>()) {
+    ++this->nInNetInvalid;
     NFD_LOG_FACE_WARN("received CachePolicy with Nack: DROP");
     return;
   }