face: replace EventEmitter usage with Signal
refs #2300
Change-Id: I17d0d65d2c474b17dd9f23f271a6144b0b4fbc07
diff --git a/daemon/face/datagram-face.hpp b/daemon/face/datagram-face.hpp
index 5b1b3d9..f0ad2b4 100644
--- a/daemon/face/datagram-face.hpp
+++ b/daemon/face/datagram-face.hpp
@@ -129,7 +129,7 @@
inline void
DatagramFace<T, U>::sendInterest(const Interest& interest)
{
- this->onSendInterest(interest);
+ this->emitSignal(onSendInterest, interest);
const Block& payload = interest.wireEncode();
m_socket->async_send(boost::asio::buffer(payload.wire(), payload.size()),
bind(&DatagramFace<T, U>::handleSend, this, _1, _2, payload));
@@ -141,7 +141,7 @@
inline void
DatagramFace<T, U>::sendData(const Data& data)
{
- this->onSendData(data);
+ this->emitSignal(onSendData, data);
const Block& payload = data.wireEncode();
m_socket->async_send(boost::asio::buffer(payload.wire(), payload.size()),
bind(&DatagramFace<T, U>::handleSend, this, _1, _2, payload));
@@ -176,8 +176,7 @@
fail("Tunnel closed");
}
else {
- fail("Send operation failed, closing socket: " +
- error.category().message(error.value()));
+ fail("Send operation failed, closing socket: " + error.message());
}
return;
}
diff --git a/daemon/face/ethernet-face.cpp b/daemon/face/ethernet-face.cpp
index c63e38e..f1fd7c3 100644
--- a/daemon/face/ethernet-face.cpp
+++ b/daemon/face/ethernet-face.cpp
@@ -123,7 +123,7 @@
void
EthernetFace::sendInterest(const Interest& interest)
{
- onSendInterest(interest);
+ this->emitSignal(onSendInterest, interest);
ndnlp::PacketArray pa = m_slicer->slice(interest.wireEncode());
for (const auto& packet : *pa) {
sendPacket(packet);
@@ -133,7 +133,7 @@
void
EthernetFace::sendData(const Data& data)
{
- onSendData(data);
+ this->emitSignal(onSendData, data);
ndnlp::PacketArray pa = m_slicer->slice(data.wireEncode());
for (const auto& packet : *pa) {
sendPacket(packet);
@@ -387,7 +387,7 @@
{
// new sender, setup a PartialMessageStore for it
reassembler.pms.reset(new ndnlp::PartialMessageStore);
- reassembler.pms->onReceive +=
+ reassembler.pms->onReceive.connect(
[this, sourceAddress] (const Block& block) {
NFD_LOG_TRACE("[id:" << getId() << ",endpoint:" << m_interfaceName
<< "] All fragments received from " << sourceAddress.toString());
@@ -395,7 +395,7 @@
NFD_LOG_WARN("[id:" << getId() << ",endpoint:" << m_interfaceName
<< "] Received unrecognized TLV block of type " << block.type()
<< " from " << sourceAddress.toString());
- };
+ });
}
scheduler::cancel(reassembler.expireEvent);
diff --git a/daemon/face/ethernet-factory.cpp b/daemon/face/ethernet-factory.cpp
index a4579f4..d07efa4 100644
--- a/daemon/face/ethernet-factory.cpp
+++ b/daemon/face/ethernet-factory.cpp
@@ -48,9 +48,9 @@
face = make_shared<EthernetFace>(socket, interface, address);
auto key = std::make_pair(interface.name, address);
- face->onFail += [this, key] (const std::string& reason) {
+ face->onFail.connectSingleShot([this, key] (const std::string& reason) {
m_multicastFaces.erase(key);
- };
+ });
m_multicastFaces.insert({key, face});
return face;
diff --git a/daemon/face/face.cpp b/daemon/face/face.cpp
index 7d3c1b7..0adf2ed 100644
--- a/daemon/face/face.cpp
+++ b/daemon/face/face.cpp
@@ -36,10 +36,10 @@
, m_isOnDemand(false)
, m_isFailed(false)
{
- onReceiveInterest += [this](const ndn::Interest&) { ++m_counters.getNInInterests(); };
- onReceiveData += [this](const ndn::Data&) { ++m_counters.getNInDatas(); };
- onSendInterest += [this](const ndn::Interest&) { ++m_counters.getNOutInterests(); };
- onSendData += [this](const ndn::Data&) { ++m_counters.getNOutDatas(); };
+ onReceiveInterest.connect([this] (const ndn::Interest&) { ++m_counters.getNInInterests(); });
+ onReceiveData .connect([this] (const ndn::Data&) { ++m_counters.getNInDatas(); });
+ onSendInterest .connect([this] (const ndn::Interest&) { ++m_counters.getNOutInterests(); });
+ onSendData .connect([this] (const ndn::Data&) { ++m_counters.getNOutDatas(); });
}
Face::~Face()
@@ -120,8 +120,6 @@
m_isFailed = true;
this->onFail(reason);
-
- this->onFail.clear();
}
template<typename FaceTraits>
diff --git a/daemon/face/face.hpp b/daemon/face/face.hpp
index 40a77a1..156a82d 100644
--- a/daemon/face/face.hpp
+++ b/daemon/face/face.hpp
@@ -78,19 +78,19 @@
~Face();
/// fires when an Interest is received
- EventEmitter<Interest> onReceiveInterest;
+ signal::Signal<Face, Interest> onReceiveInterest;
/// fires when a Data is received
- EventEmitter<Data> onReceiveData;
+ signal::Signal<Face, Data> onReceiveData;
/// fires when an Interest is sent out
- EventEmitter<Interest> onSendInterest;
+ signal::Signal<Face, Interest> onSendInterest;
/// fires when a Data is sent out
- EventEmitter<Data> onSendData;
+ signal::Signal<Face, Data> onSendData;
/// fires when face disconnects or fails to perform properly
- EventEmitter<std::string/*reason*/> onFail;
+ signal::Signal<Face, std::string/*reason*/> onFail;
/// send an Interest
virtual void
@@ -187,6 +187,11 @@
void
fail(const std::string& reason);
+ DECLARE_SIGNAL_EMIT(onReceiveInterest)
+ DECLARE_SIGNAL_EMIT(onReceiveData)
+ DECLARE_SIGNAL_EMIT(onSendInterest)
+ DECLARE_SIGNAL_EMIT(onSendData)
+
private:
void
setId(FaceId faceId);
diff --git a/daemon/face/local-face.hpp b/daemon/face/local-face.hpp
index e1c34c3..c190956 100644
--- a/daemon/face/local-face.hpp
+++ b/daemon/face/local-face.hpp
@@ -156,7 +156,7 @@
this->isLocalControlHeaderEnabled(LOCAL_CONTROL_FEATURE_NEXT_HOP_FACE_ID));
}
- this->onReceiveInterest(*i);
+ this->emitSignal(onReceiveInterest, *i);
}
else if (payload.type() == tlv::Data)
{
@@ -174,7 +174,7 @@
// false);
// }
- this->onReceiveData(*d);
+ this->emitSignal(onReceiveData, *d);
}
else
return false;
diff --git a/daemon/face/multicast-udp-face.cpp b/daemon/face/multicast-udp-face.cpp
index 34c6cd2..7e91ca5 100644
--- a/daemon/face/multicast-udp-face.cpp
+++ b/daemon/face/multicast-udp-face.cpp
@@ -61,7 +61,7 @@
void
MulticastUdpFace::sendInterest(const Interest& interest)
{
- onSendInterest(interest);
+ this->emitSignal(onSendInterest, interest);
NFD_LOG_DEBUG("Sending interest");
sendBlock(interest.wireEncode());
@@ -70,9 +70,9 @@
void
MulticastUdpFace::sendData(const Data& data)
{
- /// \todo After this method implements duplicate suppression, onSendData event should
- /// be triggered only when data is actually sent out
- onSendData(data);
+ /// \todo After this method implements duplicate suppression, onSendData signal should
+ /// be emitted only when data is actually sent out
+ this->emitSignal(onSendData, data);
NFD_LOG_DEBUG("Sending data");
sendBlock(data.wireEncode());
diff --git a/daemon/face/ndnlp-partial-message-store.hpp b/daemon/face/ndnlp-partial-message-store.hpp
index c707d92..1aed1e0 100644
--- a/daemon/face/ndnlp-partial-message-store.hpp
+++ b/daemon/face/ndnlp-partial-message-store.hpp
@@ -1,11 +1,12 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2014 Regents of the University of California,
- * Arizona Board of Regents,
- * Colorado State University,
- * University Pierre & Marie Curie, Sorbonne University,
- * Washington University in St. Louis,
- * Beijing Institute of Technology
+ * Copyright (c) 2014, Regents of the University of California,
+ * Arizona Board of Regents,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University,
+ * Washington University in St. Louis,
+ * Beijing Institute of Technology,
+ * The University of Memphis
*
* This file is part of NFD (Named Data Networking Forwarding Daemon).
* See AUTHORS.md for complete list of NFD authors and contributors.
@@ -20,7 +21,7 @@
*
* You should have received a copy of the GNU General Public License along with
* NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
#ifndef NFD_DAEMON_FACE_NDNLP_PARTIAL_MESSAGE_STORE_HPP
#define NFD_DAEMON_FACE_NDNLP_PARTIAL_MESSAGE_STORE_HPP
@@ -84,7 +85,7 @@
receiveNdnlpData(const Block& pkt);
/// fires when network layer packet is received
- EventEmitter<Block> onReceive;
+ signal::Signal<PartialMessageStore, Block> onReceive;
private:
void
diff --git a/daemon/face/stream-face.hpp b/daemon/face/stream-face.hpp
index e719da3..e7cf894 100644
--- a/daemon/face/stream-face.hpp
+++ b/daemon/face/stream-face.hpp
@@ -178,7 +178,7 @@
inline void
StreamFace<T, U>::sendInterest(const Interest& interest)
{
- this->onSendInterest(interest);
+ this->emitSignal(onSendInterest, interest);
StreamFaceSenderImpl<T, U, Interest>::send(*this, interest);
}
@@ -186,7 +186,7 @@
inline void
StreamFace<T, U>::sendData(const Data& data)
{
- this->onSendData(data);
+ this->emitSignal(onSendData, data);
StreamFaceSenderImpl<T, U, Data>::send(*this, data);
}
diff --git a/daemon/face/tcp-channel.cpp b/daemon/face/tcp-channel.cpp
index cd2744d..01eb07a 100644
--- a/daemon/face/tcp-channel.cpp
+++ b/daemon/face/tcp-channel.cpp
@@ -140,7 +140,7 @@
else
face = make_shared<TcpFace>(socket, isOnDemand);
- face->onFail += bind(&TcpChannel::afterFaceFailed, this, remoteEndpoint);
+ face->onFail.connectSingleShot(bind(&TcpChannel::afterFaceFailed, this, remoteEndpoint));
m_channelFaces[remoteEndpoint] = face;
}
diff --git a/daemon/face/udp-channel.cpp b/daemon/face/udp-channel.cpp
index 2ab4b77..83bb607 100644
--- a/daemon/face/udp-channel.cpp
+++ b/daemon/face/udp-channel.cpp
@@ -177,7 +177,8 @@
if (faceMapPos == m_channelFaces.end())
{
face = make_shared<UdpFace>(socket, isOnDemand, m_idleFaceTimeout);
- face->onFail += bind(&UdpChannel::afterFaceFailed, this, remoteEndpoint);
+
+ face->onFail.connectSingleShot(bind(&UdpChannel::afterFaceFailed, this, remoteEndpoint));
m_channelFaces[remoteEndpoint] = face;
}
diff --git a/daemon/face/udp-face.cpp b/daemon/face/udp-face.cpp
index 81e915c..400e7dc 100644
--- a/daemon/face/udp-face.cpp
+++ b/daemon/face/udp-face.cpp
@@ -24,6 +24,7 @@
*/
#include "udp-face.hpp"
+// #include "core/global-io.hpp" // for #1718 manual test below
#ifdef __linux__
#include <netinet/in.h> // for IP_MTU_DISCOVER and IP_PMTUDISC_DONT
@@ -112,6 +113,12 @@
<< ",uri:" << this->getRemoteUri()
<< "] Idle for more than " << m_idleTimeout << ", closing");
close();
+
+ // #1718 manual test: uncomment, run NFD in valgrind, send in a UDP packet
+ // expect read-after-free error and crash
+ // getGlobalIoService().post([this] {
+ // NFD_LOG_ERROR("Remaining references: " << this->shared_from_this().use_count());
+ // });
}
else {
resetRecentUsage();
diff --git a/daemon/face/udp-factory.cpp b/daemon/face/udp-factory.cpp
index 458d4fb..40d1788 100644
--- a/daemon/face/udp-factory.cpp
+++ b/daemon/face/udp-factory.cpp
@@ -237,7 +237,8 @@
multicastFace = make_shared<MulticastUdpFace>(receiveSocket, sendSocket,
localEndpoint, multicastEndpoint);
- multicastFace->onFail += bind(&UdpFactory::afterFaceFailed, this, localEndpoint);
+
+ multicastFace->onFail.connectSingleShot(bind(&UdpFactory::afterFaceFailed, this, localEndpoint));
m_multicastFaces[localEndpoint] = multicastFace;
diff --git a/daemon/face/websocket-face.cpp b/daemon/face/websocket-face.cpp
index abd8b3c..ad344d1 100644
--- a/daemon/face/websocket-face.cpp
+++ b/daemon/face/websocket-face.cpp
@@ -47,7 +47,7 @@
if (m_closed)
return;
- this->onSendInterest(interest);
+ this->emitSignal(onSendInterest, interest);
const Block& payload = interest.wireEncode();
this->getMutableCounters().getNOutBytes() += payload.size();
@@ -67,7 +67,7 @@
if (m_closed)
return;
- this->onSendData(data);
+ this->emitSignal(onSendData, data);
const Block& payload = data.wireEncode();
this->getMutableCounters().getNOutBytes() += payload.size();
diff --git a/daemon/fw/face-table.cpp b/daemon/fw/face-table.cpp
index f1db53b..71a7cf6 100644
--- a/daemon/fw/face-table.cpp
+++ b/daemon/fw/face-table.cpp
@@ -85,12 +85,9 @@
NFD_LOG_INFO("Added face id=" << faceId << " remote=" << face->getRemoteUri()
<< " local=" << face->getLocalUri());
- face->onReceiveInterest += bind(&Forwarder::onInterest,
- &m_forwarder, ref(*face), _1);
- face->onReceiveData += bind(&Forwarder::onData,
- &m_forwarder, ref(*face), _1);
- face->onFail += bind(&FaceTable::remove,
- this, face);
+ face->onReceiveInterest.connect(bind(&Forwarder::onInterest, &m_forwarder, ref(*face), _1));
+ face->onReceiveData.connect(bind(&Forwarder::onData, &m_forwarder, ref(*face), _1));
+ face->onFail.connectSingleShot(bind(&FaceTable::remove, this, face));
this->onAdd(face);
}
@@ -104,15 +101,7 @@
m_faces.erase(faceId);
face->setId(INVALID_FACEID);
NFD_LOG_INFO("Removed face id=" << faceId << " remote=" << face->getRemoteUri() <<
- " local=" << face->getLocalUri());
-
- // XXX This clears all subscriptions, because EventEmitter
- // does not support only removing Forwarder's subscription
- face->onReceiveInterest.clear();
- face->onReceiveData .clear();
- face->onSendInterest .clear();
- face->onSendData .clear();
- // don't clear onFail because other functions may need to execute
+ " local=" << face->getLocalUri());
m_forwarder.getFib().removeNextHopFromAllEntries(face);
}
diff --git a/daemon/mgmt/internal-face.cpp b/daemon/mgmt/internal-face.cpp
index 9fad328..d33282e 100644
--- a/daemon/mgmt/internal-face.cpp
+++ b/daemon/mgmt/internal-face.cpp
@@ -39,7 +39,7 @@
void
InternalFace::sendInterest(const Interest& interest)
{
- onSendInterest(interest);
+ this->emitSignal(onSendInterest, interest);
// Invoke .processInterest a bit later,
// to avoid potential problems in forwarding pipelines.
@@ -122,7 +122,7 @@
void
InternalFace::sendData(const Data& data)
{
- onSendData(data);
+ this->emitSignal(onSendData, data);
}
void
@@ -142,7 +142,7 @@
void
InternalFace::put(const Data& data)
{
- onReceiveData(data);
+ this->emitSignal(onReceiveData, data);
}
InternalFace::~InternalFace()