net: support multiple concurrent netlink requests

In preparation for generic netlink support

Change-Id: I3f648518800176015cf7435b4e61e6e73c83e796
Refs: #4020
diff --git a/src/net/detail/netlink-socket.cpp b/src/net/detail/netlink-socket.cpp
index b564afe..ea5a421 100644
--- a/src/net/detail/netlink-socket.cpp
+++ b/src/net/detail/netlink-socket.cpp
@@ -120,13 +120,23 @@
 }
 
 void
-NetlinkSocket::startAsyncReceive(MessageCallback cb)
+NetlinkSocket::registerNotificationCallback(MessageCallback cb)
 {
-  BOOST_ASSERT(cb != nullptr);
-  BOOST_ASSERT(m_onMessage == nullptr);
+  registerRequestCallback(0, std::move(cb));
+}
 
-  m_onMessage = std::move(cb);
-  asyncWait();
+void
+NetlinkSocket::registerRequestCallback(uint32_t seq, MessageCallback cb)
+{
+  if (cb == nullptr) {
+    m_pendingRequests.erase(seq);
+  }
+  else {
+    bool wasEmpty = m_pendingRequests.empty();
+    m_pendingRequests.emplace(seq, std::move(cb));
+    if (wasEmpty)
+      asyncWait();
+  }
 }
 
 static const char*
@@ -171,7 +181,8 @@
       }
       else {
         receiveAndValidate();
-        asyncWait();
+        if (!m_pendingRequests.empty())
+          asyncWait();
       }
   });
 }
@@ -236,22 +247,43 @@
                   " pid=" << nlmsg->nlmsg_pid <<
                   " group=" << nlGroup);
 
-    if (nlGroup == 0 && // not a multicast notification
-        (nlmsg->nlmsg_pid != m_pid || nlmsg->nlmsg_seq != m_seqNum)) { // not for us
-      NDN_LOG_TRACE("seq/pid mismatch, ignoring");
+    auto cbIt = m_pendingRequests.end();
+    if (nlGroup != 0) {
+      // it's a multicast notification
+      cbIt = m_pendingRequests.find(0);
+    }
+    else if (nlmsg->nlmsg_pid == m_pid) {
+      // it's for us
+      cbIt = m_pendingRequests.find(nlmsg->nlmsg_seq);
+    }
+    else {
+      NDN_LOG_TRACE("pid mismatch, ignoring");
       continue;
     }
 
-    if (nlmsg->nlmsg_flags & NLM_F_DUMP_INTR) {
+    if (cbIt == m_pendingRequests.end()) {
+      NDN_LOG_TRACE("no handler registered, ignoring");
+      continue;
+    }
+    else if (nlmsg->nlmsg_flags & NLM_F_DUMP_INTR) {
       NDN_LOG_ERROR("dump is inconsistent");
       BOOST_THROW_EXCEPTION(Error("Inconsistency detected in netlink dump"));
       // TODO: discard the rest of the message and retry the dump
     }
+    else {
+      // invoke the callback
+      BOOST_ASSERT(cbIt->second);
+      cbIt->second(nlmsg);
+    }
 
-    m_onMessage(nlmsg);
-
-    if (nlmsg->nlmsg_type == NLMSG_DONE) {
-      break;
+    // garbage collect the handler if we don't need it anymore:
+    // do it only if this is a reply message (i.e. not a notification) and either
+    //   (1) it's not a multi-part message, in which case this is the only fragment, or
+    //   (2) it's the last fragment of a multi-part message
+    if (nlGroup == 0 && (!(nlmsg->nlmsg_flags & NLM_F_MULTI) || nlmsg->nlmsg_type == NLMSG_DONE)) {
+      NDN_LOG_TRACE("removing handler for seq=" << nlmsg->nlmsg_seq);
+      BOOST_ASSERT(cbIt != m_pendingRequests.end());
+      m_pendingRequests.erase(cbIt);
     }
   }
 }
@@ -269,7 +301,7 @@
 }
 
 void
-RtnlSocket::sendDumpRequest(uint16_t nlmsgType)
+RtnlSocket::sendDumpRequest(uint16_t nlmsgType, MessageCallback cb)
 {
   struct RtnlRequest
   {
@@ -290,6 +322,8 @@
   request->rtext = RTEXT_FILTER_SKIP_STATS;
   request->nlh.nlmsg_len = NLMSG_SPACE(sizeof(ifinfomsg)) + request->rta.rta_len;
 
+  registerRequestCallback(request->nlh.nlmsg_seq, std::move(cb));
+
   boost::asio::async_write(*m_sock, boost::asio::buffer(request.get(), request->nlh.nlmsg_len),
     // capture 'request' to prevent its premature deallocation
     [request] (const boost::system::error_code& ec, size_t) {