face: simplify and optimize Internal{Forwarder,Client}Transport

Refs: #4528
Change-Id: Ie3246382965640e0d2cb71116b6526e68925887c
diff --git a/daemon/face/internal-transport.cpp b/daemon/face/internal-transport.cpp
index 3e9552c..5b3fcac 100644
--- a/daemon/face/internal-transport.cpp
+++ b/daemon/face/internal-transport.cpp
@@ -46,21 +46,21 @@
 }
 
 void
-InternalForwarderTransport::receiveFromLink(const Block& packet)
+InternalForwarderTransport::receivePacket(Block&& packet)
 {
-  NFD_LOG_FACE_TRACE(__func__);
-
-  Packet p;
-  p.packet = packet;
-  this->receive(std::move(p));
+  getGlobalIoService().post([this, pkt = std::move(packet)] () mutable {
+    NFD_LOG_FACE_TRACE("Received: " << pkt.size() << " bytes");
+    receive(Packet{std::move(pkt)});
+  });
 }
 
 void
 InternalForwarderTransport::doSend(Packet&& packet)
 {
-  NFD_LOG_FACE_TRACE(__func__);
+  NFD_LOG_FACE_TRACE("Sending to " << m_peer);
 
-  this->emitSignal(afterSend, packet.packet);
+  if (m_peer)
+    m_peer->receivePacket(std::move(packet.packet));
 }
 
 void
@@ -68,50 +68,59 @@
 {
   NFD_LOG_FACE_TRACE(__func__);
 
-  this->setState(TransportState::CLOSED);
+  setState(TransportState::CLOSED);
 }
 
-static void
-asyncReceive(InternalTransportBase* recipient, const Block& packet)
+InternalClientTransport::~InternalClientTransport()
 {
-  getGlobalIoService().post([packet, recipient] {
-    recipient->receiveFromLink(packet);
-  });
+  if (m_forwarder != nullptr) {
+    m_forwarder->setPeer(nullptr);
+  }
 }
 
 void
-InternalClientTransport::connectToForwarder(InternalForwarderTransport* forwarderTransport)
+InternalClientTransport::connectToForwarder(InternalForwarderTransport* forwarder)
 {
-  NFD_LOG_DEBUG(__func__ << " " << forwarderTransport);
+  NFD_LOG_DEBUG(__func__ << " " << forwarder);
 
-  m_fwToClientTransmitConn.disconnect();
-  m_clientToFwTransmitConn.disconnect();
-  m_fwTransportStateConn.disconnect();
+  if (m_forwarder != nullptr) {
+    // disconnect from the old forwarder transport
+    m_forwarder->setPeer(nullptr);
+    m_fwTransportStateConn.disconnect();
+  }
 
-  if (forwarderTransport != nullptr) {
-    m_fwToClientTransmitConn = forwarderTransport->afterSend.connect(bind(&asyncReceive, this, _1));
-    m_clientToFwTransmitConn = this->afterSend.connect(bind(&asyncReceive, forwarderTransport, _1));
-    m_fwTransportStateConn = forwarderTransport->afterStateChange.connect(
+  m_forwarder = forwarder;
+
+  if (m_forwarder != nullptr) {
+    // connect to the new forwarder transport
+    m_forwarder->setPeer(this);
+    m_fwTransportStateConn = m_forwarder->afterStateChange.connect(
       [this] (TransportState oldState, TransportState newState) {
         if (newState == TransportState::CLOSED) {
-          this->connectToForwarder(nullptr);
+          connectToForwarder(nullptr);
         }
       });
   }
 }
 
 void
-InternalClientTransport::receiveFromLink(const Block& packet)
+InternalClientTransport::receivePacket(Block&& packet)
 {
-  if (m_receiveCallback) {
-    m_receiveCallback(packet);
-  }
+  getGlobalIoService().post([this, pkt = std::move(packet)] {
+    NFD_LOG_TRACE("Received: " << pkt.size() << " bytes");
+    if (m_receiveCallback) {
+      m_receiveCallback(pkt);
+    }
+  });
 }
 
 void
 InternalClientTransport::send(const Block& wire)
 {
-  this->emitSignal(afterSend, wire);
+  NFD_LOG_TRACE("Sending to " << m_forwarder);
+
+  if (m_forwarder)
+    m_forwarder->receivePacket(Block{wire});
 }
 
 void
@@ -121,7 +130,7 @@
   encoder.appendByteArray(header.wire(), header.size());
   encoder.appendByteArray(payload.wire(), payload.size());
 
-  this->send(encoder.block());
+  send(encoder.block());
 }
 
 } // namespace face
diff --git a/daemon/face/internal-transport.hpp b/daemon/face/internal-transport.hpp
index 9f834d7..f859099 100644
--- a/daemon/face/internal-transport.hpp
+++ b/daemon/face/internal-transport.hpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /*
- * Copyright (c) 2014-2018,  Regents of the University of California,
+ * Copyright (c) 2014-2019,  Regents of the University of California,
  *                           Arizona Board of Regents,
  *                           Colorado State University,
  *                           University Pierre & Marie Curie, Sorbonne University,
@@ -33,7 +33,7 @@
 namespace nfd {
 namespace face {
 
-/** \brief abstracts a transport that can be paired with another
+/** \brief Abstracts a transport that can be paired with another.
  */
 class InternalTransportBase
 {
@@ -41,49 +41,53 @@
   virtual
   ~InternalTransportBase() = default;
 
-  /** \brief causes the transport to receive a link-layer packet
-   */
   virtual void
-  receiveFromLink(const Block& packet) = 0;
-
-  signal::Signal<InternalTransportBase, Block> afterSend;
-
-protected:
-  DECLARE_SIGNAL_EMIT(afterSend)
+  receivePacket(Block&& packet) = 0;
 };
 
-/** \brief implements a forwarder-side transport that can be paired with another
+/** \brief Implements a forwarder-side transport that can be paired with another transport.
  */
-class InternalForwarderTransport : public Transport, public InternalTransportBase
+class InternalForwarderTransport final : public Transport, public InternalTransportBase
 {
 public:
+  explicit
   InternalForwarderTransport(const FaceUri& localUri = FaceUri("internal://"),
                              const FaceUri& remoteUri = FaceUri("internal://"),
                              ndn::nfd::FaceScope scope = ndn::nfd::FACE_SCOPE_LOCAL,
                              ndn::nfd::LinkType linkType = ndn::nfd::LINK_TYPE_POINT_TO_POINT);
 
   void
-  receiveFromLink(const Block& packet) override;
+  setPeer(InternalTransportBase* peer)
+  {
+    m_peer = peer;
+  }
+
+  void
+  receivePacket(Block&& packet) final;
 
 protected:
   void
-  doClose() override;
+  doClose() final;
 
 private:
   void
-  doSend(Packet&& packet) override;
+  doSend(Packet&& packet) final;
 
 private:
   NFD_LOG_MEMBER_DECL();
+
+  InternalTransportBase* m_peer = nullptr;
 };
 
-/** \brief implements a client-side transport that can be paired with another
+/** \brief Implements a client-side transport that can be paired with an InternalForwarderTransport.
  */
-class InternalClientTransport : public ndn::Transport, public InternalTransportBase
+class InternalClientTransport final : public ndn::Transport, public InternalTransportBase
 {
 public:
-  /** \brief connect to a forwarder-side transport
-   *  \param forwarderTransport the forwarder-side transport to connect to; may be nullptr
+  ~InternalClientTransport() final;
+
+  /** \brief Connect to a forwarder-side transport.
+   *  \param forwarder the forwarder-side transport to connect to; may be nullptr
    *
    *  The connected forwarder-side transport will be disconnected automatically if this method
    *  is called again, or if that transport is closed.
@@ -91,37 +95,36 @@
    *  all sent packets would be lost, and nothing would be received.
    */
   void
-  connectToForwarder(InternalForwarderTransport* forwarderTransport);
+  connectToForwarder(InternalForwarderTransport* forwarder);
 
   void
-  receiveFromLink(const Block& packet) override;
+  receivePacket(Block&& packet) final;
 
   void
-  close() override
+  send(const Block& wire) final;
+
+  void
+  send(const Block& header, const Block& payload) final;
+
+  void
+  close() final
   {
   }
 
   void
-  pause() override
+  pause() final
   {
   }
 
   void
-  resume() override
+  resume() final
   {
   }
 
-  void
-  send(const Block& wire) override;
-
-  void
-  send(const Block& header, const Block& payload) override;
-
 private:
   NFD_LOG_MEMBER_DECL();
 
-  signal::ScopedConnection m_fwToClientTransmitConn;
-  signal::ScopedConnection m_clientToFwTransmitConn;
+  InternalForwarderTransport* m_forwarder = nullptr;
   signal::ScopedConnection m_fwTransportStateConn;
 };
 
diff --git a/tests/daemon/face/internal-face.t.cpp b/tests/daemon/face/internal-face.t.cpp
index 9ea6903..ecd1d00 100644
--- a/tests/daemon/face/internal-face.t.cpp
+++ b/tests/daemon/face/internal-face.t.cpp
@@ -42,7 +42,7 @@
 public:
   InternalFaceFixture()
   {
-    std::tie(forwarderFace, clientFace) = makeInternalFace(m_keyChain);;
+    std::tie(forwarderFace, clientFace) = makeInternalFace(m_keyChain);
 
     forwarderFace->afterReceiveInterest.connect(
       [this] (const Interest& interest) { receivedInterests.push_back(interest); } );
@@ -82,7 +82,7 @@
 
 BOOST_AUTO_TEST_CASE(ReceiveInterestTimeout)
 {
-  shared_ptr<Interest> interest = makeInterest("/TLETccRv");
+  auto interest = makeInterest("/TLETccRv");
   interest->setInterestLifetime(100_ms);
 
   bool hasTimeout = false;
@@ -102,7 +102,7 @@
 
 BOOST_AUTO_TEST_CASE(ReceiveInterestSendData)
 {
-  shared_ptr<Interest> interest = makeInterest("/PQstEJGdL");
+  auto interest = makeInterest("/PQstEJGdL");
 
   bool hasReceivedData = false;
   clientFace->expressInterest(*interest,
@@ -117,8 +117,7 @@
   BOOST_REQUIRE_EQUAL(receivedInterests.size(), 1);
   BOOST_CHECK_EQUAL(receivedInterests.back().getName(), "/PQstEJGdL");
 
-  shared_ptr<Data> data = makeData("/PQstEJGdL/aI7oCrDXNX");
-  forwarderFace->sendData(*data);
+  forwarderFace->sendData(*makeData("/PQstEJGdL/aI7oCrDXNX"));
   this->advanceClocks(1_ms, 10);
 
   BOOST_CHECK(hasReceivedData);
@@ -126,7 +125,7 @@
 
 BOOST_AUTO_TEST_CASE(ReceiveInterestSendNack)
 {
-  shared_ptr<Interest> interest = makeInterest("/1HrsRM1X", 152);
+  auto interest = makeInterest("/1HrsRM1X", 152);
 
   bool hasReceivedNack = false;
   clientFace->expressInterest(*interest,
@@ -141,8 +140,7 @@
   BOOST_REQUIRE_EQUAL(receivedInterests.size(), 1);
   BOOST_CHECK_EQUAL(receivedInterests.back().getName(), "/1HrsRM1X");
 
-  lp::Nack nack = makeNack("/1HrsRM1X", 152, lp::NackReason::NO_ROUTE);
-  forwarderFace->sendNack(nack);
+  forwarderFace->sendNack(makeNack("/1HrsRM1X", 152, lp::NackReason::NO_ROUTE));
   this->advanceClocks(1_ms, 10);
 
   BOOST_CHECK(hasReceivedNack);
@@ -156,12 +154,10 @@
       hasDeliveredInterest = true;
       BOOST_CHECK_EQUAL(interest.getName(), "/Wpc8TnEeoF/f6SzV8hD");
 
-      shared_ptr<Data> data = makeData("/Wpc8TnEeoF/f6SzV8hD/3uytUJCuIi");
-      clientFace->put(*data);
+      clientFace->put(*makeData("/Wpc8TnEeoF/f6SzV8hD/3uytUJCuIi"));
     });
 
-  shared_ptr<Interest> interest = makeInterest("/Wpc8TnEeoF/f6SzV8hD");
-  forwarderFace->sendInterest(*interest);
+  forwarderFace->sendInterest(*makeInterest("/Wpc8TnEeoF/f6SzV8hD"));
   this->advanceClocks(1_ms, 10);
 
   BOOST_CHECK(hasDeliveredInterest);
@@ -177,12 +173,10 @@
       hasDeliveredInterest = true;
       BOOST_CHECK_EQUAL(interest.getName(), "/4YgJKWcXN/5oaTe05o");
 
-      lp::Nack nack = makeNack("/4YgJKWcXN/5oaTe05o", 191, lp::NackReason::NO_ROUTE);
-      clientFace->put(nack);
+      clientFace->put(makeNack("/4YgJKWcXN/5oaTe05o", 191, lp::NackReason::NO_ROUTE));
     });
 
-  shared_ptr<Interest> interest = makeInterest("/4YgJKWcXN/5oaTe05o", 191);
-  forwarderFace->sendInterest(*interest);
+  forwarderFace->sendInterest(*makeInterest("/4YgJKWcXN/5oaTe05o", 191));
   this->advanceClocks(1_ms, 10);
 
   BOOST_CHECK(hasDeliveredInterest);
@@ -197,7 +191,7 @@
   BOOST_CHECK_EQUAL(forwarderFace->getState(), FaceState::CLOSED);
   forwarderFace.reset();
 
-  shared_ptr<Interest> interest = makeInterest("/zpHsVesu0B");
+  auto interest = makeInterest("/zpHsVesu0B");
   interest->setInterestLifetime(100_ms);
 
   bool hasTimeout = false;
@@ -216,8 +210,7 @@
   g_io.poll(); // #3248 workaround
   clientFace.reset();
 
-  shared_ptr<Interest> interest = makeInterest("/aau42XQqb");
-  forwarderFace->sendInterest(*interest);
+  forwarderFace->sendInterest(*makeInterest("/aau42XQqb"));
   BOOST_CHECK_NO_THROW(this->advanceClocks(1_ms, 10));
 }
 
diff --git a/tests/daemon/fw/topology-tester.cpp b/tests/daemon/fw/topology-tester.cpp
index c401aef..0e19b57 100644
--- a/tests/daemon/fw/topology-tester.cpp
+++ b/tests/daemon/fw/topology-tester.cpp
@@ -24,6 +24,7 @@
  */
 
 #include "topology-tester.hpp"
+
 #include "daemon/global.hpp"
 #include "face/generic-link-service.hpp"
 
@@ -33,13 +34,19 @@
 namespace fw {
 namespace tests {
 
-using face::InternalTransportBase;
-using face::InternalForwarderTransport;
-using face::InternalClientTransport;
 using face::GenericLinkService;
+using face::InternalClientTransport;
+using face::InternalForwarderTransport;
+
+TopologyLink::NodeTransport::NodeTransport(shared_ptr<Face> f, ReceiveProxy::Callback cb)
+  : face(std::move(f))
+  , transport(dynamic_cast<InternalForwarderTransport*>(face->getTransport()))
+  , proxy(std::move(cb))
+{
+  BOOST_ASSERT(transport != nullptr);
+}
 
 TopologyLink::TopologyLink(time::nanoseconds delay)
-  : m_isUp(true)
 {
   this->setDelay(delay);
 }
@@ -66,19 +73,19 @@
 void
 TopologyLink::addFace(TopologyNode i, shared_ptr<Face> face)
 {
-  BOOST_ASSERT(m_transports.count(i) == 0);
-  auto& nodeTransport = m_transports[i];
+  auto receiveCb = [this, i] (Block&& pkt) { transmit(i, std::move(pkt)); };
 
-  nodeTransport.face = face;
+  auto ret = m_transports.emplace(std::piecewise_construct,
+                                  std::forward_as_tuple(i),
+                                  std::forward_as_tuple(std::move(face), std::move(receiveCb)));
+  BOOST_ASSERT(ret.second);
 
-  nodeTransport.transport = dynamic_cast<InternalTransportBase*>(face->getTransport());
-  BOOST_ASSERT(nodeTransport.transport != nullptr);
-  nodeTransport.transport->afterSend.connect(
-    [this, i] (const Block& packet) { this->transmit(i, packet); });
+  auto& node = ret.first->second;
+  node.transport->setPeer(&node.proxy);
 }
 
 void
-TopologyLink::transmit(TopologyNode i, const Block& packet)
+TopologyLink::transmit(TopologyNode i, Block&& packet)
 {
   if (!m_isUp) {
     return;
@@ -91,22 +98,21 @@
       continue;
     }
 
-    InternalTransportBase* recipient = p.second.transport;
-    this->scheduleReceive(recipient, packet);
+    this->scheduleReceive(p.second.transport, Block{packet});
   }
 }
 
 void
-TopologyLink::scheduleReceive(InternalTransportBase* recipient, const Block& packet)
+TopologyLink::scheduleReceive(face::InternalTransportBase* recipient, Block&& packet)
 {
-  getScheduler().schedule(m_delay, [packet, recipient] {
-    recipient->receiveFromLink(packet);
+  getScheduler().schedule(m_delay, [=, pkt = std::move(packet)] () mutable {
+    recipient->receivePacket(std::move(pkt));
   });
 }
 
 TopologyAppLink::TopologyAppLink(shared_ptr<Face> forwarderFace)
-  : m_face(forwarderFace)
-  , m_forwarderTransport(static_cast<InternalForwarderTransport*>(forwarderFace->getTransport()))
+  : m_face(std::move(forwarderFace))
+  , m_forwarderTransport(static_cast<InternalForwarderTransport*>(m_face->getTransport()))
   , m_clientTransport(make_shared<InternalClientTransport>())
   , m_client(make_shared<ndn::Face>(m_clientTransport, getGlobalIoService()))
 {
@@ -190,7 +196,7 @@
     auto face = make_shared<Face>(std::move(service), std::move(transport));
 
     forwarder.addFace(face);
-    link->addFace(i, face);
+    link->addFace(i, std::move(face));
   }
 
   m_links.push_back(link); // keep a shared_ptr so callers don't have to
@@ -212,7 +218,7 @@
 
   forwarder.addFace(face);
 
-  auto al = make_shared<TopologyAppLink>(face);
+  auto al = make_shared<TopologyAppLink>(std::move(face));
   m_appLinks.push_back(al); // keep a shared_ptr so callers don't have to
   return al;
 }
diff --git a/tests/daemon/fw/topology-tester.hpp b/tests/daemon/fw/topology-tester.hpp
index cbd8fd9..2c8d7d0 100644
--- a/tests/daemon/fw/topology-tester.hpp
+++ b/tests/daemon/fw/topology-tester.hpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /*
- * Copyright (c) 2014-2018,  Regents of the University of California,
+ * Copyright (c) 2014-2019,  Regents of the University of California,
  *                           Arizona Board of Regents,
  *                           Colorado State University,
  *                           University Pierre & Marie Curie, Sorbonne University,
@@ -108,21 +108,48 @@
 
 private:
   void
-  transmit(TopologyNode i, const Block& packet);
+  transmit(TopologyNode i, Block&& packet);
 
   void
-  scheduleReceive(face::InternalTransportBase* recipient, const Block& packet);
+  scheduleReceive(face::InternalTransportBase* recipient, Block&& packet);
 
 private:
-  bool m_isUp;
+  bool m_isUp = true;
   time::nanoseconds m_delay;
 
-  struct NodeTransport
+  class ReceiveProxy : public face::InternalTransportBase
   {
-    face::InternalTransportBase* transport;
+  public:
+    using Callback = std::function<void(Block&&)>;
+
+    explicit
+    ReceiveProxy(Callback cb)
+      : m_cb(std::move(cb))
+    {
+    }
+
+    void
+    receivePacket(Block&& packet) final
+    {
+      m_cb(std::move(packet));
+    }
+
+  private:
+    Callback m_cb;
+  };
+
+  class NodeTransport
+  {
+  public:
+    NodeTransport(shared_ptr<Face> face, ReceiveProxy::Callback receiveCallback);
+
+  public:
     shared_ptr<Face> face;
+    face::InternalForwarderTransport* transport;
+    ReceiveProxy proxy;
     std::set<TopologyNode> blockedDestinations;
   };
+
   std::unordered_map<TopologyNode, NodeTransport> m_transports;
 };