transport: StreamTransportImpl and StreamTransportWithResolverImpl segfault fix
refs #3136
Change-Id: I9cb4d5ff760cd8005ace699e82934c5951935b97
diff --git a/src/transport/stream-transport-impl.hpp b/src/transport/stream-transport-impl.hpp
index 963293e..d542741 100644
--- a/src/transport/stream-transport-impl.hpp
+++ b/src/transport/stream-transport-impl.hpp
@@ -35,7 +35,7 @@
* and boost::asio::local::stream_protocol
*/
template<typename BaseTransport, typename Protocol>
-class StreamTransportImpl
+class StreamTransportImpl : public enable_shared_from_this<StreamTransportImpl<BaseTransport, Protocol>>
{
public:
typedef StreamTransportImpl<BaseTransport,Protocol> Impl;
@@ -60,10 +60,10 @@
// Wait at most 4 seconds to connect
/// @todo Decide whether this number should be configurable
m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
- m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
+ m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this->shared_from_this(), _1));
m_socket.open();
- m_socket.async_connect(endpoint, bind(&Impl::connectHandler, this, _1));
+ m_socket.async_connect(endpoint, bind(&Impl::connectHandler, this->shared_from_this(), _1));
}
}
@@ -103,8 +103,7 @@
if (!m_transport.m_isReceiving) {
m_transport.m_isReceiving = true;
m_inputBufferSize = 0;
- m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
- bind(&Impl::handleAsyncReceive, this, _1, _2));
+ asyncReceive();
}
}
@@ -137,9 +136,7 @@
m_transport.m_isConnected = true;
if (!m_transmissionQueue.empty()) {
- boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
- bind(&Impl::handleAsyncWrite, this, _1,
- m_transmissionQueue.begin()));
+ asyncWrite();
}
}
else {
@@ -165,9 +162,7 @@
m_transmissionQueue.emplace_back(sequence);
if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
- boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
- bind(&Impl::handleAsyncWrite, this, _1,
- m_transmissionQueue.begin()));
+ asyncWrite();
}
// if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
@@ -175,6 +170,14 @@
}
void
+ asyncWrite()
+ {
+ BOOST_ASSERT(!m_transmissionQueue.empty());
+ boost::asio::async_write(m_socket, m_transmissionQueue.front(),
+ bind(&Impl::handleAsyncWrite, this->shared_from_this(), _1, m_transmissionQueue.begin()));
+ }
+
+ void
handleAsyncWrite(const boost::system::error_code& error, TransmissionQueue::iterator queueItem)
{
if (error) {
@@ -194,13 +197,19 @@
m_transmissionQueue.erase(queueItem);
if (!m_transmissionQueue.empty()) {
- boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
- bind(&Impl::handleAsyncWrite, this, _1,
- m_transmissionQueue.begin()));
+ asyncWrite();
}
}
void
+ asyncReceive()
+ {
+ m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
+ MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
+ bind(&Impl::handleAsyncReceive, this->shared_from_this(), _1, _2));
+ }
+
+ void
handleAsyncReceive(const boost::system::error_code& error, std::size_t nBytesRecvd)
{
if (error) {
@@ -235,9 +244,7 @@
}
}
- m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
- MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
- bind(&Impl::handleAsyncReceive, this, _1, _2));
+ asyncReceive();
}
bool
diff --git a/src/transport/stream-transport-with-resolver-impl.hpp b/src/transport/stream-transport-with-resolver-impl.hpp
index 1fa3351..ede8444 100644
--- a/src/transport/stream-transport-with-resolver-impl.hpp
+++ b/src/transport/stream-transport-with-resolver-impl.hpp
@@ -52,11 +52,11 @@
// Wait at most 4 seconds to connect
/// @todo Decide whether this number should be configurable
this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
- this->m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
+ this->m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this->shared_from_this(), _1));
// typename boost::asio::ip::basic_resolver< Protocol > resolver;
auto resolver = make_shared<typename Protocol::resolver>(ref(this->m_socket.get_io_service()));
- resolver->async_resolve(query, bind(&Impl::resolveHandler, this, _1, _2, resolver));
+ resolver->async_resolve(query, bind(&Impl::resolveHandler, this->shared_from_base(), _1, _2, resolver));
}
protected:
@@ -78,7 +78,14 @@
BOOST_THROW_EXCEPTION(Transport::Error(error, "Unable to resolve because host or port"));
}
- this->m_socket.async_connect(*endpoint, bind(&Impl::connectHandler, this, _1));
+ this->m_socket.async_connect(*endpoint, bind(&Impl::connectHandler, this->shared_from_this(), _1));
+ }
+
+private:
+ shared_ptr<Impl>
+ shared_from_base()
+ {
+ return static_pointer_cast<Impl>(this->shared_from_this());
}
};