face: implement NDNLP fragmentation in EthernetFace

Change-Id: I5b9ff271087ca0b2d5ee38be2f77911cfe9283f4
Refs: #1209
diff --git a/daemon/face/ethernet-face.cpp b/daemon/face/ethernet-face.cpp
index 0e13844..21a12a4 100644
--- a/daemon/face/ethernet-face.cpp
+++ b/daemon/face/ethernet-face.cpp
@@ -60,6 +60,8 @@
 
 NFD_LOG_INIT("EthernetFace");
 
+const time::nanoseconds EthernetFace::REASSEMBLER_LIFETIME = time::seconds(60);
+
 EthernetFace::EthernetFace(const shared_ptr<boost::asio::posix::stream_descriptor>& socket,
                            const NetworkInterfaceInfo& interface,
                            const ethernet::Address& address)
@@ -90,6 +92,8 @@
   NFD_LOG_DEBUG("[id:" << getId() << ",endpoint:" << m_interfaceName
                 << "] Interface MTU is: " << m_interfaceMtu);
 
+  m_slicer.reset(new ndnlp::Slicer(m_interfaceMtu));
+
   char filter[100];
   std::snprintf(filter, sizeof(filter),
                 "(ether proto 0x%x) && (ether dst %s) && (not ether src %s)",
@@ -114,14 +118,20 @@
 EthernetFace::sendInterest(const Interest& interest)
 {
   onSendInterest(interest);
-  sendPacket(interest.wireEncode());
+  ndnlp::PacketArray pa = m_slicer->slice(interest.wireEncode());
+  for (const auto& packet : *pa) {
+    sendPacket(packet);
+  }
 }
 
 void
 EthernetFace::sendData(const Data& data)
 {
   onSendData(data);
-  sendPacket(data.wireEncode());
+  ndnlp::PacketArray pa = m_slicer->slice(data.wireEncode());
+  for (const auto& packet : *pa) {
+    sendPacket(packet);
+  }
 }
 
 void
@@ -257,13 +267,7 @@
       return fail("Face closed");
     }
 
-  /// \todo Fragmentation
-  if (block.size() > m_interfaceMtu)
-    {
-      NFD_LOG_ERROR("[id:" << getId() << ",endpoint:" << m_interfaceName
-                    << "] Fragmentation not implemented: dropping packet larger than MTU");
-      return;
-    }
+  BOOST_ASSERT(block.size() <= m_interfaceMtu);
 
   /// \todo Right now there is no reserve when packet is received, but
   ///       we should reserve some space at the beginning and at the end
@@ -341,11 +345,12 @@
     }
 
   const ether_header* eh = reinterpret_cast<const ether_header*>(packet);
+  const ethernet::Address sourceAddress(eh->ether_shost);
 
   // assert in case BPF fails to filter unwanted frames
   BOOST_ASSERT_MSG(ethernet::Address(eh->ether_dhost) == m_destAddress,
                    "Received frame addressed to a different multicast group");
-  BOOST_ASSERT_MSG(ethernet::Address(eh->ether_shost) != m_srcAddress,
+  BOOST_ASSERT_MSG(sourceAddress != m_srcAddress,
                    "Received frame sent by this host");
   BOOST_ASSERT_MSG(ntohs(eh->ether_type) == ethernet::ETHERTYPE_NDN,
                    "Received frame with unrecognized ethertype");
@@ -354,25 +359,51 @@
   length -= ethernet::HDR_LEN;
 
   /// \todo Reserve space in front and at the back of the underlying buffer
-  Block element;
-  bool isOk = Block::fromBuffer(packet, length, element);
-  if (isOk)
-    {
-      NFD_LOG_TRACE("[id:" << getId() << ",endpoint:" << m_interfaceName
-                    << "] Received: " << element.size() << " bytes");
-      this->getMutableCounters().getNInBytes() += element.size();
-
-      if (!decodeAndDispatchInput(element))
-        {
-          NFD_LOG_WARN("[id:" << getId() << ",endpoint:" << m_interfaceName
-                       << "] Received unrecognized block of type " << element.type());
-        }
-    }
-  else
+  Block fragment;
+  bool isOk = Block::fromBuffer(packet, length, fragment);
+  if (!isOk)
     {
       NFD_LOG_WARN("[id:" << getId() << ",endpoint:" << m_interfaceName
-                   << "] Received block is invalid or too large to process");
+                   << "] Block received from " << sourceAddress.toString()
+                   << " is invalid or too large to process");
+      return;
     }
+
+  NFD_LOG_TRACE("[id:" << getId() << ",endpoint:" << m_interfaceName
+                << "] Received: " << fragment.size() << " bytes from "
+                << sourceAddress.toString());
+  this->getMutableCounters().getNInBytes() += fragment.size();
+
+  Reassembler& reassembler = m_reassemblers[sourceAddress];
+  if (!reassembler.pms)
+    {
+      // new sender, setup a PartialMessageStore for it
+      reassembler.pms.reset(new ndnlp::PartialMessageStore);
+      reassembler.pms->onReceive +=
+        [this, sourceAddress] (const Block& block) {
+          NFD_LOG_TRACE("[id:" << getId() << ",endpoint:" << m_interfaceName
+                        << "] All fragments received from " << sourceAddress.toString());
+          if (!decodeAndDispatchInput(block))
+            NFD_LOG_WARN("[id:" << getId() << ",endpoint:" << m_interfaceName
+                         << "] Received unrecognized TLV block of type " << block.type()
+                         << " from " << sourceAddress.toString());
+        };
+    }
+
+  scheduler::cancel(reassembler.expireEvent);
+  reassembler.expireEvent = scheduler::schedule(REASSEMBLER_LIFETIME,
+    [this, sourceAddress] {
+      BOOST_VERIFY(m_reassemblers.erase(sourceAddress) == 1);
+    });
+
+  try {
+    reassembler.pms->receiveNdnlpData(fragment);
+  }
+  catch (const ndnlp::ParseError& e) {
+    NFD_LOG_WARN("[id:" << getId() << ",endpoint:" << m_interfaceName
+                 << "] Received invalid NDNLP fragment from "
+                 << sourceAddress.toString() << " : " << e.what());
+  }
 }
 
 void