decoding+transport: Exception-safe Block parsing
Change-Id: I3e83b6ca4c8ca42b8bb1ddc8dc50c52ee366c55c
Refs: #1291
diff --git a/src/transport/stream-transport.hpp b/src/transport/stream-transport.hpp
index 8562d88..713d740 100644
--- a/src/transport/stream-transport.hpp
+++ b/src/transport/stream-transport.hpp
@@ -24,12 +24,12 @@
StreamTransportImpl(base_transport& transport, boost::asio::io_service& ioService)
: m_transport(transport)
, m_socket(ioService)
- , m_partialDataSize(0)
+ , m_inputBufferSize(0)
, m_connectionInProgress(false)
, m_connectTimer(ioService)
{
}
-
+
void
connectHandler(const boost::system::error_code& error)
{
@@ -55,7 +55,7 @@
m_socket.async_send(buffer,
bind(&impl::handle_async_send, this, _1, i->first, i->second));
}
-
+
m_sendQueue.clear();
m_sendPairQueue.clear();
}
@@ -76,6 +76,7 @@
m_connectionInProgress = false;
m_transport.m_isConnected = false;
+ m_transport.m_isExpectingData = false;
m_socket.close();
throw Transport::Error(error, "error while connecting to the forwarder");
}
@@ -96,8 +97,8 @@
bind(&impl::connectHandler, this, _1));
}
}
-
- void
+
+ void
close()
{
m_connectTimer.cancel();
@@ -124,7 +125,7 @@
if (!m_transport.m_isExpectingData)
{
m_transport.m_isExpectingData = true;
- m_partialDataSize = 0;
+ m_inputBufferSize = 0;
m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_LENGTH), 0,
bind(&impl::handle_async_receive, this, _1, _2));
}
@@ -153,113 +154,75 @@
buffers.reserve(2);
buffers.push_back(boost::asio::buffer(header.wire(), header.size()));
buffers.push_back(boost::asio::buffer(payload.wire(), payload.size()));
-
+
m_socket.async_send(buffers,
bind(&impl::handle_async_send, this, _1, header, payload));
}
}
-
- inline void
+
+ inline bool
processAll(uint8_t* buffer, size_t& offset, size_t availableSize)
{
+ Block element;
while(offset < availableSize)
{
- Block element(buffer + offset, availableSize - offset);
- m_transport.receive(element);
+ bool ok = Block::fromBuffer(buffer + offset, availableSize - offset, element);
+ if (!ok)
+ return false;
+ m_transport.receive(element);
offset += element.size();
}
+ return true;
}
-
+
void
handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
{
- /// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
-
if (error)
{
if (error == boost::system::errc::operation_canceled) {
// async receive has been explicitly cancelled (e.g., socket close)
return;
}
-
+
m_socket.close(); // closing at this point may not be that necessary
m_transport.m_isConnected = true;
throw Transport::Error(error, "error while receiving data from socket");
}
-
- if (!error && bytes_recvd > 0)
+
+ m_inputBufferSize += bytes_recvd;
+ // do magic
+
+ std::size_t offset = 0;
+ bool ok = processAll(m_inputBuffer, offset, m_inputBufferSize);
+ if (!ok && m_inputBufferSize == MAX_LENGTH && offset == 0)
{
- // m_inputBuffer has bytes_recvd received bytes of data
- if (m_partialDataSize > 0)
+ // very bad... should close connection
+ m_socket.close();
+ m_transport.m_isConnected = false;
+ m_transport.m_isExpectingData = false;
+ throw Transport::Error(boost::system::error_code(),
+ "input buffer full, but a valid TLV cannot be decoded");
+ }
+
+ if (offset > 0)
+ {
+ if (offset != m_inputBufferSize)
{
- size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-m_partialDataSize);
- std::copy(m_inputBuffer, m_inputBuffer + newDataSize, m_partialData + m_partialDataSize);
-
- m_partialDataSize += newDataSize;
-
- size_t offset = 0;
- try
- {
- processAll(m_partialData, offset, m_partialDataSize);
-
- // no exceptions => processed the whole thing
- if (bytes_recvd - newDataSize > 0)
- {
- // there is a little bit more data available
-
- offset = 0;
- m_partialDataSize = bytes_recvd - newDataSize;
- std::copy(m_inputBuffer + newDataSize, m_inputBuffer + newDataSize + m_partialDataSize, m_partialData);
-
- processAll(m_partialData, offset, m_partialDataSize);
-
- // no exceptions => processed the whole thing
- m_partialDataSize = 0;
- }
- else
- {
- // done processing
- m_partialDataSize = 0;
- }
- }
- catch(Tlv::Error &)
- {
- if (offset > 0)
- {
- m_partialDataSize -= offset;
- std::copy(m_partialData + offset, m_partialData + offset + m_partialDataSize, m_partialData);
- }
- else if (offset == 0 && m_partialDataSize == MAX_LENGTH)
- {
- // very bad... should close connection
- m_socket.close();
- m_transport.m_isConnected = true;
- throw Transport::Error(boost::system::error_code(),
- "input buffer full, but a valid TLV cannot be decoded");
- }
- }
+ std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
+ m_inputBuffer);
+ m_inputBufferSize -= offset;
}
else
{
- size_t offset = 0;
- try
- {
- processAll(m_inputBuffer, offset, bytes_recvd);
- }
- catch(Tlv::Error &error)
- {
- if (offset > 0)
- {
- m_partialDataSize = bytes_recvd - offset;
- std::copy(m_inputBuffer + offset, m_inputBuffer + offset + m_partialDataSize, m_partialData);
- }
- }
+ m_inputBufferSize = 0;
}
}
- m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_LENGTH), 0,
- bind(&impl::handle_async_receive, this, _1, _2));
+ m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
+ MAX_LENGTH - m_inputBufferSize), 0,
+ bind(&impl::handle_async_receive, this, _1, _2));
}
void
@@ -277,12 +240,10 @@
protected:
base_transport& m_transport;
-
+
typename protocol::socket m_socket;
uint8_t m_inputBuffer[MAX_LENGTH];
-
- uint8_t m_partialData[MAX_LENGTH];
- size_t m_partialDataSize;
+ size_t m_inputBufferSize;
std::list< Block > m_sendQueue;
std::list< std::pair<Block, Block> > m_sendPairQueue;
@@ -304,7 +265,7 @@
: StreamTransportImpl<base_transport, protocol>(transport, ioService)
{
}
-
+
void
resolveHandler(const boost::system::error_code& error,
typename protocol::resolver::iterator endpoint,
@@ -314,15 +275,16 @@
{
if (error == boost::system::errc::operation_canceled)
return;
-
+
throw Transport::Error(error, "Error during resolution of host or port");
}
-
+
typename protocol::resolver::iterator end;
if (endpoint == end)
{
this->m_connectionInProgress = false;
this->m_transport.m_isConnected = false;
+ this->m_transport.m_isExpectingData = false;
this->m_socket.close();
throw Transport::Error(error, "Unable to resolve because host or port");
}
@@ -330,7 +292,7 @@
this->m_socket.async_connect(*endpoint,
bind(&impl::connectHandler, this, _1));
}
-
+
void
connect(const typename protocol::resolver::query& query)
{