face: simplify and optimize Internal{Forwarder,Client}Transport
Refs: #4528
Change-Id: Ie3246382965640e0d2cb71116b6526e68925887c
diff --git a/daemon/face/internal-transport.cpp b/daemon/face/internal-transport.cpp
index 3e9552c..5b3fcac 100644
--- a/daemon/face/internal-transport.cpp
+++ b/daemon/face/internal-transport.cpp
@@ -46,21 +46,21 @@
}
void
-InternalForwarderTransport::receiveFromLink(const Block& packet)
+InternalForwarderTransport::receivePacket(Block&& packet)
{
- NFD_LOG_FACE_TRACE(__func__);
-
- Packet p;
- p.packet = packet;
- this->receive(std::move(p));
+ getGlobalIoService().post([this, pkt = std::move(packet)] () mutable {
+ NFD_LOG_FACE_TRACE("Received: " << pkt.size() << " bytes");
+ receive(Packet{std::move(pkt)});
+ });
}
void
InternalForwarderTransport::doSend(Packet&& packet)
{
- NFD_LOG_FACE_TRACE(__func__);
+ NFD_LOG_FACE_TRACE("Sending to " << m_peer);
- this->emitSignal(afterSend, packet.packet);
+ if (m_peer)
+ m_peer->receivePacket(std::move(packet.packet));
}
void
@@ -68,50 +68,59 @@
{
NFD_LOG_FACE_TRACE(__func__);
- this->setState(TransportState::CLOSED);
+ setState(TransportState::CLOSED);
}
-static void
-asyncReceive(InternalTransportBase* recipient, const Block& packet)
+InternalClientTransport::~InternalClientTransport()
{
- getGlobalIoService().post([packet, recipient] {
- recipient->receiveFromLink(packet);
- });
+ if (m_forwarder != nullptr) {
+ m_forwarder->setPeer(nullptr);
+ }
}
void
-InternalClientTransport::connectToForwarder(InternalForwarderTransport* forwarderTransport)
+InternalClientTransport::connectToForwarder(InternalForwarderTransport* forwarder)
{
- NFD_LOG_DEBUG(__func__ << " " << forwarderTransport);
+ NFD_LOG_DEBUG(__func__ << " " << forwarder);
- m_fwToClientTransmitConn.disconnect();
- m_clientToFwTransmitConn.disconnect();
- m_fwTransportStateConn.disconnect();
+ if (m_forwarder != nullptr) {
+ // disconnect from the old forwarder transport
+ m_forwarder->setPeer(nullptr);
+ m_fwTransportStateConn.disconnect();
+ }
- if (forwarderTransport != nullptr) {
- m_fwToClientTransmitConn = forwarderTransport->afterSend.connect(bind(&asyncReceive, this, _1));
- m_clientToFwTransmitConn = this->afterSend.connect(bind(&asyncReceive, forwarderTransport, _1));
- m_fwTransportStateConn = forwarderTransport->afterStateChange.connect(
+ m_forwarder = forwarder;
+
+ if (m_forwarder != nullptr) {
+ // connect to the new forwarder transport
+ m_forwarder->setPeer(this);
+ m_fwTransportStateConn = m_forwarder->afterStateChange.connect(
[this] (TransportState oldState, TransportState newState) {
if (newState == TransportState::CLOSED) {
- this->connectToForwarder(nullptr);
+ connectToForwarder(nullptr);
}
});
}
}
void
-InternalClientTransport::receiveFromLink(const Block& packet)
+InternalClientTransport::receivePacket(Block&& packet)
{
- if (m_receiveCallback) {
- m_receiveCallback(packet);
- }
+ getGlobalIoService().post([this, pkt = std::move(packet)] {
+ NFD_LOG_TRACE("Received: " << pkt.size() << " bytes");
+ if (m_receiveCallback) {
+ m_receiveCallback(pkt);
+ }
+ });
}
void
InternalClientTransport::send(const Block& wire)
{
- this->emitSignal(afterSend, wire);
+ NFD_LOG_TRACE("Sending to " << m_forwarder);
+
+ if (m_forwarder)
+ m_forwarder->receivePacket(Block{wire});
}
void
@@ -121,7 +130,7 @@
encoder.appendByteArray(header.wire(), header.size());
encoder.appendByteArray(payload.wire(), payload.size());
- this->send(encoder.block());
+ send(encoder.block());
}
} // namespace face