transport: Serialize socket write operations
Change-Id: Ieccaf0ccbd6ee8bd08b1eee77d781edd47b2cbb2
Refs: #1769, #1775
diff --git a/src/common.hpp b/src/common.hpp
index 4a9914c..a35addb 100644
--- a/src/common.hpp
+++ b/src/common.hpp
@@ -60,6 +60,7 @@
using std::shared_ptr;
using std::weak_ptr;
+using std::bad_weak_ptr;
using std::make_shared;
using std::enable_shared_from_this;
@@ -102,6 +103,7 @@
using boost::shared_ptr;
using boost::weak_ptr;
+using boost::bad_weak_ptr;
using boost::make_shared;
using boost::enable_shared_from_this;
diff --git a/src/detail/face-impl.hpp b/src/detail/face-impl.hpp
index 7c259aa..187d7fa 100644
--- a/src/detail/face-impl.hpp
+++ b/src/detail/face-impl.hpp
@@ -137,6 +137,27 @@
m_pendingInterestTable.remove_if(MatchPendingInterestId(pendingInterestId));
}
+ void
+ asyncPutData(const shared_ptr<const Data>& data)
+ {
+ if (!m_face.m_transport->isConnected())
+ m_face.m_transport->connect(*m_face.m_ioService,
+ bind(&Face::onReceiveElement, &m_face, _1));
+
+ if (!m_face.m_transport->isExpectingData())
+ m_face.m_transport->resume();
+
+ if (!data->getLocalControlHeader().empty(false, true))
+ {
+ m_face.m_transport->send(data->getLocalControlHeader().wireEncode(*data, false, true),
+ data->wireEncode());
+ }
+ else
+ {
+ m_face.m_transport->send(data->wireEncode());
+ }
+ }
+
/////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/src/encoding/block.cpp b/src/encoding/block.cpp
index ed265b0..eeb0c3f 100644
--- a/src/encoding/block.cpp
+++ b/src/encoding/block.cpp
@@ -31,6 +31,7 @@
#include "buffer-stream.hpp"
#include <boost/lexical_cast.hpp>
+#include <boost/asio/buffer.hpp>
namespace ndn {
@@ -397,4 +398,10 @@
boost::lexical_cast<std::string>(type) + "] from Block");
}
+Block::operator boost::asio::const_buffer() const
+{
+ return boost::asio::const_buffer(wire(), size());
+}
+
+
} // namespace ndn
diff --git a/src/encoding/block.hpp b/src/encoding/block.hpp
index c91d12c..17192d8 100644
--- a/src/encoding/block.hpp
+++ b/src/encoding/block.hpp
@@ -29,6 +29,12 @@
#include "buffer.hpp"
#include "tlv.hpp"
+namespace boost {
+namespace asio {
+class const_buffer;
+} // namespace asio
+} // namespace boost
+
namespace ndn {
template<bool> class EncodingImpl;
@@ -281,6 +287,9 @@
bool
operator!=(const Block& other) const;
+public: // ConvertibleToConstBuffer
+ operator boost::asio::const_buffer() const;
+
protected:
ConstBufferPtr m_buffer;
diff --git a/src/face.cpp b/src/face.cpp
index 6f13570..28482ba 100644
--- a/src/face.cpp
+++ b/src/face.cpp
@@ -165,19 +165,18 @@
void
Face::put(const Data& data)
{
- if (!m_transport->isConnected())
- m_transport->connect(*m_ioService,
- bind(&Face::onReceiveElement, this, _1));
+ shared_ptr<const Data> dataPtr;
+ try {
+ dataPtr = data.shared_from_this();
+ }
+ catch (const bad_weak_ptr& e) {
+ std::cerr << "Face::put WARNING: the supplied Data should be created using make_shared<Data>()"
+ << std::endl;
+ dataPtr = make_shared<Data>(data);
+ }
- if (!data.getLocalControlHeader().empty(false, true))
- {
- m_transport->send(data.getLocalControlHeader().wireEncode(data, false, true),
- data.wireEncode());
- }
- else
- {
- m_transport->send(data.wireEncode());
- }
+ // If the same ioService thread, dispatch directly calls the method
+ m_ioService->dispatch(bind(&Impl::asyncPutData, m_impl, dataPtr));
}
void
@@ -186,8 +185,6 @@
m_ioService->post(bind(&Impl::asyncRemovePendingInterest, m_impl, pendingInterestId));
}
-
-
const RegisteredPrefixId*
Face::setInterestFilter(const InterestFilter& interestFilter,
const OnInterest& onInterest,
diff --git a/src/face.hpp b/src/face.hpp
index a5ece29..b915e83 100644
--- a/src/face.hpp
+++ b/src/face.hpp
@@ -440,7 +440,12 @@
* @brief Publish data packet
*
* This method can be called to satisfy the incoming Interest or to put Data packet into
- * the cache of the local NDN forwarder
+ * the cache of the local NDN forwarder.
+ *
+ * @param data Data packet to publish. It is highly recommended to use Data packet that
+ * was created using make_shared<Data>(...). Otherwise, put() will make an
+ * extra copy of the Data packet to ensure validity of published Data until
+ * asynchronous put() operation finishes.
*/
void
put(const Data& data);
diff --git a/src/transport/stream-transport.hpp b/src/transport/stream-transport.hpp
index 04aeac3..491f77a 100644
--- a/src/transport/stream-transport.hpp
+++ b/src/transport/stream-transport.hpp
@@ -38,6 +38,9 @@
typedef Protocol protocol;
typedef StreamTransportImpl<BaseTransport,Protocol> impl;
+ typedef std::list<Block> BlockSequence;
+ typedef std::list<BlockSequence> TransmissionQueue;
+
StreamTransportImpl(base_transport& transport, boost::asio::io_service& ioService)
: m_transport(transport)
, m_socket(ioService)
@@ -58,23 +61,11 @@
resume();
m_transport.m_isConnected = true;
- for (std::list<Block>::iterator i = m_sendQueue.begin(); i != m_sendQueue.end(); ++i)
- m_socket.async_send(boost::asio::buffer(i->wire(), i->size()),
- bind(&impl::handle_async_send, this, _1, *i));
-
- for (std::list< std::pair<Block,Block> >::iterator i = m_sendPairQueue.begin();
- i != m_sendPairQueue.end(); ++i)
- {
- std::vector<boost::asio::const_buffer> buffer;
- buffer.reserve(2);
- buffer.push_back(boost::asio::buffer(i->first.wire(), i->first.size()));
- buffer.push_back(boost::asio::buffer(i->second.wire(), i->second.size()));
- m_socket.async_send(buffer,
- bind(&impl::handle_async_send2, this, _1, i->first, i->second));
- }
-
- m_sendQueue.clear();
- m_sendPairQueue.clear();
+ if (!m_transmissionQueue.empty()) {
+ boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+ bind(&impl::handleAsyncWrite, this, _1,
+ m_transmissionQueue.begin()));
+ }
}
else
{
@@ -124,8 +115,7 @@
m_transport.m_isConnected = false;
m_transport.m_isExpectingData = false;
- m_sendQueue.clear();
- m_sendPairQueue.clear();
+ m_transmissionQueue.clear();
}
void
@@ -159,30 +149,60 @@
void
send(const Block& wire)
{
- if (!m_transport.m_isConnected)
- m_sendQueue.push_back(wire);
- else
- m_socket.async_send(boost::asio::buffer(wire.wire(), wire.size()),
- bind(&impl::handle_async_send, this, _1, wire));
+ BlockSequence sequence;
+ sequence.push_back(wire);
+ m_transmissionQueue.push_back(sequence);
+
+ if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
+ boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+ bind(&impl::handleAsyncWrite, this, _1,
+ m_transmissionQueue.begin()));
+ }
+
+ // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
+ // next write will be scheduled either in connectHandler or in asyncWriteHandler
}
void
send(const Block& header, const Block& payload)
{
- if (!m_transport.m_isConnected)
- {
- m_sendPairQueue.push_back(std::make_pair(header, payload));
- }
- else
- {
- std::vector<boost::asio::const_buffer> buffers;
- buffers.reserve(2);
- buffers.push_back(boost::asio::buffer(header.wire(), header.size()));
- buffers.push_back(boost::asio::buffer(payload.wire(), payload.size()));
+ BlockSequence sequence;
+ sequence.push_back(header);
+ sequence.push_back(payload);
+ m_transmissionQueue.push_back(sequence);
- m_socket.async_send(buffers,
- bind(&impl::handle_async_send2, this, _1, header, payload));
+ if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
+ boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+ bind(&impl::handleAsyncWrite, this, _1,
+ m_transmissionQueue.begin()));
+ }
+
+ // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
+ // next write will be scheduled either in connectHandler or in asyncWriteHandler
+ }
+
+ void
+ handleAsyncWrite(const boost::system::error_code& error,
+ TransmissionQueue::iterator queueItem)
+ {
+ if (error)
+ {
+ if (error == boost::system::errc::operation_canceled) {
+ // async receive has been explicitly cancelled (e.g., socket close)
+ return;
+ }
+
+ m_transport.close();
+ throw Transport::Error(error, "error while sending data to socket");
}
+
+ m_transmissionQueue.erase(queueItem);
+
+ if (!m_transmissionQueue.empty()) {
+ boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+ bind(&impl::handleAsyncWrite, this, _1,
+ m_transmissionQueue.begin()));
+ }
}
inline bool
@@ -246,19 +266,6 @@
bind(&impl::handle_async_receive, this, _1, _2));
}
- void
- handle_async_send(const boost::system::error_code& error, const Block& wire)
- {
- // pass (needed to keep data block alive during the send)
- }
-
- void
- handle_async_send2(const boost::system::error_code& error,
- const Block& header, const Block& payload)
- {
- // pass (needed to keep data blocks alive during the send)
- }
-
protected:
base_transport& m_transport;
@@ -266,8 +273,7 @@
uint8_t m_inputBuffer[MAX_LENGTH];
size_t m_inputBufferSize;
- std::list< Block > m_sendQueue;
- std::list< std::pair<Block, Block> > m_sendPairQueue;
+ TransmissionQueue m_transmissionQueue;
bool m_connectionInProgress;
boost::asio::deadline_timer m_connectTimer;
diff --git a/src/transport/tcp-transport.cpp b/src/transport/tcp-transport.cpp
index 6fab8e0..7ffe530 100644
--- a/src/transport/tcp-transport.cpp
+++ b/src/transport/tcp-transport.cpp
@@ -53,18 +53,21 @@
void
TcpTransport::send(const Block& wire)
{
+ BOOST_ASSERT(static_cast<bool>(m_impl));
m_impl->send(wire);
}
void
TcpTransport::send(const Block& header, const Block& payload)
{
+ BOOST_ASSERT(static_cast<bool>(m_impl));
m_impl->send(header, payload);
}
void
TcpTransport::close()
{
+ BOOST_ASSERT(static_cast<bool>(m_impl));
m_impl->close();
m_impl.reset();
}
@@ -72,12 +75,15 @@
void
TcpTransport::pause()
{
- m_impl->pause();
+ if (static_cast<bool>(m_impl)) {
+ m_impl->pause();
+ }
}
void
TcpTransport::resume()
{
+ BOOST_ASSERT(static_cast<bool>(m_impl));
m_impl->resume();
}
diff --git a/src/transport/unix-transport.cpp b/src/transport/unix-transport.cpp
index cbfada8..03ee8b6 100644
--- a/src/transport/unix-transport.cpp
+++ b/src/transport/unix-transport.cpp
@@ -117,8 +117,9 @@
void
UnixTransport::pause()
{
- BOOST_ASSERT(static_cast<bool>(m_impl));
- m_impl->pause();
+ if (static_cast<bool>(m_impl)) {
+ m_impl->pause();
+ }
}
void