face: Refactor internal PIT to use scheduled events
The commit also changes how face is paused when there are no pending
interests left and there are no registered prefixes with local
forwarder: data structures for pending interests and registered prefixes
will fire up a signal when they become empty.
Change-Id: I6b87a44b0c8bc766865a51962ecacaec85b4adad
Refs: #1372, #2518
diff --git a/src/detail/face-impl.hpp b/src/detail/face-impl.hpp
index cb5b3bc..fb6642a 100644
--- a/src/detail/face-impl.hpp
+++ b/src/detail/face-impl.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2013-2014 Regents of the University of California.
+ * Copyright (c) 2013-2015 Regents of the University of California.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -27,9 +27,11 @@
#include "registered-prefix.hpp"
#include "pending-interest.hpp"
+#include "container-with-on-empty-signal.hpp"
#include "../util/scheduler.hpp"
#include "../util/config-file.hpp"
+#include "../util/signal.hpp"
#include "../transport/transport.hpp"
#include "../transport/unix-transport.hpp"
@@ -43,14 +45,26 @@
class Face::Impl : noncopyable
{
public:
- typedef std::list<shared_ptr<PendingInterest> > PendingInterestTable;
+ typedef ContainerWithOnEmptySignal<shared_ptr<PendingInterest>> PendingInterestTable;
typedef std::list<shared_ptr<InterestFilterRecord> > InterestFilterTable;
- typedef std::list<shared_ptr<RegisteredPrefix> > RegisteredPrefixTable;
+ typedef ContainerWithOnEmptySignal<shared_ptr<RegisteredPrefix>> RegisteredPrefixTable;
explicit
Impl(Face& face)
: m_face(face)
+ , m_scheduler(m_face.getIoService())
+ , m_processEventsTimeoutEvent(m_scheduler)
{
+ auto postOnEmptyPitOrNoRegisteredPrefixes = [this] {
+ this->m_face.getIoService().post(bind(&Impl::onEmptyPitOrNoRegisteredPrefixes, this));
+ // without this extra "post", transport can get paused (-async_read) and then resumed
+ // (+async_read) from within onInterest/onData callback. After onInterest/onData
+ // finishes, there is another +async_read with the same memory block. A few of such
+ // async_read duplications can cause various effects and result in segfault.
+ };
+
+ m_pendingInterestTable.onEmpty.connect(postOnEmptyPitOrNoRegisteredPrefixes);
+ m_registeredPrefixTable.onEmpty.connect(postOnEmptyPitOrNoRegisteredPrefixes);
}
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -59,49 +73,34 @@
void
satisfyPendingInterests(Data& data)
{
- for (PendingInterestTable::iterator i = m_pendingInterestTable.begin();
- i != m_pendingInterestTable.end();
- )
- {
- if ((*i)->getInterest()->matchesData(data))
- {
- // Copy pointers to the objects and remove the PIT entry before calling the callback.
- OnData onData = (*i)->getOnData();
- shared_ptr<const Interest> interest = (*i)->getInterest();
+ for (auto entry = m_pendingInterestTable.begin(); entry != m_pendingInterestTable.end(); ) {
+ if ((*entry)->getInterest().matchesData(data)) {
+ shared_ptr<PendingInterest> matchedEntry = *entry;
- PendingInterestTable::iterator next = i;
- ++next;
- m_pendingInterestTable.erase(i);
- i = next;
+ entry = m_pendingInterestTable.erase(entry);
- if (static_cast<bool>(onData)) {
- onData(*interest, data);
- }
- }
- else
- ++i;
+ matchedEntry->invokeDataCallback(data);
}
+ else
+ ++entry;
+ }
}
void
processInterestFilters(Interest& interest)
{
- for (InterestFilterTable::iterator i = m_interestFilterTable.begin();
- i != m_interestFilterTable.end();
- ++i)
- {
- if ((*i)->doesMatch(interest.getName()))
- {
- (**i)(interest);
- }
+ for (const auto& filter : m_interestFilterTable) {
+ if (filter->doesMatch(interest.getName())) {
+ filter->invokeInterestCallback(interest);
}
+ }
}
/////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////
void
- ensureConnected(bool wantResume = true)
+ ensureConnected(bool wantResume)
{
if (!m_face.m_transport->isConnected())
m_face.m_transport->connect(m_face.m_ioService,
@@ -115,26 +114,22 @@
asyncExpressInterest(const shared_ptr<const Interest>& interest,
const OnData& onData, const OnTimeout& onTimeout)
{
- this->ensureConnected();
+ this->ensureConnected(true);
- m_pendingInterestTable.push_back(make_shared<PendingInterest>(interest, onData, onTimeout));
+ auto entry =
+ m_pendingInterestTable.insert(make_shared<PendingInterest>(interest,
+ onData, onTimeout,
+ ref(m_scheduler))).first;
+ (*entry)->setDeleter([this, entry] { m_pendingInterestTable.erase(entry); });
- if (!interest->getLocalControlHeader().empty(nfd::LocalControlHeader::ENCODE_NEXT_HOP))
- {
- // encode only NextHopFaceId towards the forwarder
- m_face.m_transport->send(interest->getLocalControlHeader()
- .wireEncode(*interest, nfd::LocalControlHeader::ENCODE_NEXT_HOP),
- interest->wireEncode());
- }
- else
- {
- m_face.m_transport->send(interest->wireEncode());
- }
-
- if (!m_pitTimeoutCheckTimerActive) {
- m_pitTimeoutCheckTimerActive = true;
- m_pitTimeoutCheckTimer->expires_from_now(time::milliseconds(100));
- m_pitTimeoutCheckTimer->async_wait(bind(&Impl::checkPitExpire, this));
+ if (!interest->getLocalControlHeader().empty(nfd::LocalControlHeader::ENCODE_NEXT_HOP)) {
+ // encode only NextHopFaceId towards the forwarder
+ m_face.m_transport->send(interest->getLocalControlHeader()
+ .wireEncode(*interest, nfd::LocalControlHeader::ENCODE_NEXT_HOP),
+ interest->wireEncode());
+ }
+ else {
+ m_face.m_transport->send(interest->wireEncode());
}
}
@@ -147,19 +142,17 @@
void
asyncPutData(const shared_ptr<const Data>& data)
{
- this->ensureConnected();
+ this->ensureConnected(true);
- if (!data->getLocalControlHeader().empty(nfd::LocalControlHeader::ENCODE_CACHING_POLICY))
- {
- m_face.m_transport->send(
- data->getLocalControlHeader().wireEncode(*data,
- nfd::LocalControlHeader::ENCODE_CACHING_POLICY),
- data->wireEncode());
- }
- else
- {
- m_face.m_transport->send(data->wireEncode());
- }
+ if (!data->getLocalControlHeader().empty(nfd::LocalControlHeader::ENCODE_CACHING_POLICY)) {
+ m_face.m_transport->send(
+ data->getLocalControlHeader().wireEncode(*data,
+ nfd::LocalControlHeader::ENCODE_CACHING_POLICY),
+ data->wireEncode());
+ }
+ else {
+ m_face.m_transport->send(data->wireEncode());
+ }
}
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -215,7 +208,7 @@
afterPrefixRegistered(const shared_ptr<RegisteredPrefix>& registeredPrefix,
const RegisterPrefixSuccessCallback& onSuccess)
{
- m_registeredPrefixTable.push_back(registeredPrefix);
+ m_registeredPrefixTable.insert(registeredPrefix);
if (static_cast<bool>(registeredPrefix->getFilter())) {
// it was a combined operation
@@ -268,75 +261,32 @@
{
m_registeredPrefixTable.erase(item);
- if (!m_pitTimeoutCheckTimerActive && m_registeredPrefixTable.empty())
- {
- m_face.m_transport->pause();
- if (!m_ioServiceWork) {
- m_processEventsTimeoutTimer->cancel();
- }
- }
-
if (static_cast<bool>(onSuccess)) {
onSuccess();
}
}
- /////////////////////////////////////////////////////////////////////////////////////////////////
- /////////////////////////////////////////////////////////////////////////////////////////////////
-
void
- checkPitExpire()
+ onEmptyPitOrNoRegisteredPrefixes()
{
- // Check for PIT entry timeouts.
- time::steady_clock::TimePoint now = time::steady_clock::now();
-
- PendingInterestTable::iterator i = m_pendingInterestTable.begin();
- while (i != m_pendingInterestTable.end())
- {
- if ((*i)->isTimedOut(now))
- {
- // Save the PendingInterest and remove it from the PIT. Then call the callback.
- shared_ptr<PendingInterest> pendingInterest = *i;
-
- i = m_pendingInterestTable.erase(i);
-
- pendingInterest->callTimeout();
- }
- else
- ++i;
- }
-
- if (!m_pendingInterestTable.empty()) {
- m_pitTimeoutCheckTimerActive = true;
-
- m_pitTimeoutCheckTimer->expires_from_now(time::milliseconds(100));
- m_pitTimeoutCheckTimer->async_wait(bind(&Impl::checkPitExpire, this));
- }
- else {
- m_pitTimeoutCheckTimerActive = false;
-
- if (m_registeredPrefixTable.empty()) {
- m_face.m_transport->pause();
- if (!m_ioServiceWork) {
- m_processEventsTimeoutTimer->cancel();
- }
+ if (m_pendingInterestTable.empty() && m_registeredPrefixTable.empty()) {
+ m_face.m_transport->pause();
+ if (!m_ioServiceWork) {
+ m_processEventsTimeoutEvent.cancel();
}
}
}
private:
Face& m_face;
+ util::Scheduler m_scheduler;
+ util::scheduler::ScopedEventId m_processEventsTimeoutEvent;
PendingInterestTable m_pendingInterestTable;
InterestFilterTable m_interestFilterTable;
RegisteredPrefixTable m_registeredPrefixTable;
- ConfigFile m_config;
-
- shared_ptr<boost::asio::io_service::work> m_ioServiceWork; // if thread needs to be preserved
- shared_ptr<monotonic_deadline_timer> m_pitTimeoutCheckTimer;
- bool m_pitTimeoutCheckTimerActive;
- shared_ptr<monotonic_deadline_timer> m_processEventsTimeoutTimer;
+ unique_ptr<boost::asio::io_service::work> m_ioServiceWork; // if thread needs to be preserved
friend class Face;
};