decoding+transport: Exception-safe Block parsing

Change-Id: I3e83b6ca4c8ca42b8bb1ddc8dc50c52ee366c55c
Refs: #1291
diff --git a/src/transport/stream-transport.hpp b/src/transport/stream-transport.hpp
index 8562d88..713d740 100644
--- a/src/transport/stream-transport.hpp
+++ b/src/transport/stream-transport.hpp
@@ -24,12 +24,12 @@
   StreamTransportImpl(base_transport& transport, boost::asio::io_service& ioService)
     : m_transport(transport)
     , m_socket(ioService)
-    , m_partialDataSize(0)
+    , m_inputBufferSize(0)
     , m_connectionInProgress(false)
     , m_connectTimer(ioService)
   {
   }
-  
+
   void
   connectHandler(const boost::system::error_code& error)
   {
@@ -55,7 +55,7 @@
             m_socket.async_send(buffer,
                                 bind(&impl::handle_async_send, this, _1, i->first, i->second));
           }
-        
+
         m_sendQueue.clear();
         m_sendPairQueue.clear();
       }
@@ -76,6 +76,7 @@
 
     m_connectionInProgress = false;
     m_transport.m_isConnected = false;
+    m_transport.m_isExpectingData = false;
     m_socket.close();
     throw Transport::Error(error, "error while connecting to the forwarder");
   }
@@ -96,8 +97,8 @@
                              bind(&impl::connectHandler, this, _1));
     }
   }
-  
-  void 
+
+  void
   close()
   {
     m_connectTimer.cancel();
@@ -124,7 +125,7 @@
     if (!m_transport.m_isExpectingData)
       {
         m_transport.m_isExpectingData = true;
-        m_partialDataSize = 0;
+        m_inputBufferSize = 0;
         m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_LENGTH), 0,
                                bind(&impl::handle_async_receive, this, _1, _2));
       }
@@ -153,113 +154,75 @@
         buffers.reserve(2);
         buffers.push_back(boost::asio::buffer(header.wire(),  header.size()));
         buffers.push_back(boost::asio::buffer(payload.wire(), payload.size()));
-                          
+
         m_socket.async_send(buffers,
                             bind(&impl::handle_async_send, this, _1, header, payload));
       }
   }
-  
-  inline void
+
+  inline bool
   processAll(uint8_t* buffer, size_t& offset, size_t availableSize)
   {
+    Block element;
     while(offset < availableSize)
       {
-        Block element(buffer + offset, availableSize - offset);
-        m_transport.receive(element);
+        bool ok = Block::fromBuffer(buffer + offset, availableSize - offset, element);
+        if (!ok)
+          return false;
 
+        m_transport.receive(element);
         offset += element.size();
       }
+    return true;
   }
-  
+
   void
   handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
   {
-    /// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
-
     if (error)
       {
         if (error == boost::system::errc::operation_canceled) {
           // async receive has been explicitly cancelled (e.g., socket close)
           return;
         }
-        
+
         m_socket.close(); // closing at this point may not be that necessary
         m_transport.m_isConnected = true;
         throw Transport::Error(error, "error while receiving data from socket");
       }
-    
-    if (!error && bytes_recvd > 0)
+
+    m_inputBufferSize += bytes_recvd;
+    // do magic
+
+    std::size_t offset = 0;
+    bool ok = processAll(m_inputBuffer, offset, m_inputBufferSize);
+    if (!ok && m_inputBufferSize == MAX_LENGTH && offset == 0)
       {
-        // m_inputBuffer has bytes_recvd received bytes of data
-        if (m_partialDataSize > 0)
+        // very bad... should close connection
+        m_socket.close();
+        m_transport.m_isConnected = false;
+        m_transport.m_isExpectingData = false;
+        throw Transport::Error(boost::system::error_code(),
+                               "input buffer full, but a valid TLV cannot be decoded");
+      }
+
+    if (offset > 0)
+      {
+        if (offset != m_inputBufferSize)
           {
-            size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-m_partialDataSize);
-            std::copy(m_inputBuffer, m_inputBuffer + newDataSize, m_partialData + m_partialDataSize);
-
-            m_partialDataSize += newDataSize;
-              
-            size_t offset = 0;
-            try
-              {
-                processAll(m_partialData, offset, m_partialDataSize);
-
-                // no exceptions => processed the whole thing
-                if (bytes_recvd - newDataSize > 0)
-                  {
-                    // there is a little bit more data available
-                        
-                    offset = 0;
-                    m_partialDataSize = bytes_recvd - newDataSize;
-                    std::copy(m_inputBuffer + newDataSize, m_inputBuffer + newDataSize + m_partialDataSize, m_partialData);
-
-                    processAll(m_partialData, offset, m_partialDataSize);
-
-                    // no exceptions => processed the whole thing
-                    m_partialDataSize = 0;
-                  }
-                else
-                  {
-                    // done processing
-                    m_partialDataSize = 0;
-                  }
-              }
-            catch(Tlv::Error &)
-              {
-                if (offset > 0)
-                  {
-                    m_partialDataSize -= offset;
-                    std::copy(m_partialData + offset, m_partialData + offset + m_partialDataSize, m_partialData);
-                  }
-                else if (offset == 0 && m_partialDataSize == MAX_LENGTH)
-                  {
-                    // very bad... should close connection
-                    m_socket.close();
-                    m_transport.m_isConnected = true;
-                    throw Transport::Error(boost::system::error_code(),
-                                           "input buffer full, but a valid TLV cannot be decoded");
-                  }
-              }
+            std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
+                      m_inputBuffer);
+            m_inputBufferSize -= offset;
           }
         else
           {
-            size_t offset = 0;
-            try
-              {
-                processAll(m_inputBuffer, offset, bytes_recvd);
-              }
-            catch(Tlv::Error &error)
-              {
-                if (offset > 0)
-                  {
-                    m_partialDataSize = bytes_recvd - offset;
-                    std::copy(m_inputBuffer + offset, m_inputBuffer + offset + m_partialDataSize, m_partialData);
-                  }
-              }
+            m_inputBufferSize = 0;
           }
       }
 
-    m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_LENGTH), 0,
-                          bind(&impl::handle_async_receive, this, _1, _2));
+    m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
+                                               MAX_LENGTH - m_inputBufferSize), 0,
+                           bind(&impl::handle_async_receive, this, _1, _2));
   }
 
   void
@@ -277,12 +240,10 @@
 
 protected:
   base_transport& m_transport;
-  
+
   typename protocol::socket m_socket;
   uint8_t m_inputBuffer[MAX_LENGTH];
-
-  uint8_t m_partialData[MAX_LENGTH];
-  size_t m_partialDataSize;
+  size_t m_inputBufferSize;
 
   std::list< Block > m_sendQueue;
   std::list< std::pair<Block, Block> > m_sendPairQueue;
@@ -304,7 +265,7 @@
     : StreamTransportImpl<base_transport, protocol>(transport, ioService)
   {
   }
-  
+
   void
   resolveHandler(const boost::system::error_code& error,
                  typename protocol::resolver::iterator endpoint,
@@ -314,15 +275,16 @@
       {
         if (error == boost::system::errc::operation_canceled)
           return;
-        
+
         throw Transport::Error(error, "Error during resolution of host or port");
       }
-    
+
     typename protocol::resolver::iterator end;
     if (endpoint == end)
       {
         this->m_connectionInProgress = false;
         this->m_transport.m_isConnected = false;
+        this->m_transport.m_isExpectingData = false;
         this->m_socket.close();
         throw Transport::Error(error, "Unable to resolve because host or port");
       }
@@ -330,7 +292,7 @@
     this->m_socket.async_connect(*endpoint,
                                  bind(&impl::connectHandler, this, _1));
   }
-  
+
   void
   connect(const typename protocol::resolver::query& query)
   {