transport/unix: Adding complex logic to handle streamed data
This part needs serious testing, but not sure how to do it
Change-Id: Ib1bb036a46017e9ac6058fc5e017a0fed21b43a2
diff --git a/src/transport/unix-transport.cpp b/src/transport/unix-transport.cpp
index 811e0d4..3e01943 100644
--- a/src/transport/unix-transport.cpp
+++ b/src/transport/unix-transport.cpp
@@ -10,6 +10,7 @@
#include <ndn-cpp/face.hpp>
#include <ndn-cpp/transport/unix-transport.hpp>
+#include <ndn-cpp/c/util/ndn_memory.h>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
@@ -25,8 +26,9 @@
{
public:
Impl(UnixTransport &transport)
- : transport_(transport),
- socket_(*transport_.ioService_)
+ : transport_(transport)
+ , socket_(*transport_.ioService_)
+ , partialDataSize_(0)
{
}
@@ -37,6 +39,7 @@
socket_.connect(protocol::endpoint(transport_.unixSocket_));
// socket_.async_connect(protocol::endpoint(unixSocket));
+ partialDataSize_ = 0;
socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
boost::bind(&Impl::handle_async_receive, this, _1, _2));
}
@@ -53,25 +56,95 @@
socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
boost::bind(&Impl::handle_async_send, this, _1, wire));
}
+
+ inline void
+ processAll(uint8_t *buffer, size_t &offset, size_t availableSize)
+ {
+ while(offset < availableSize)
+ {
+ Block element(buffer + offset, availableSize - offset);
+ transport_.receive(element);
+
+ offset += element.size();
+ }
+ }
void
handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
{
/// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
-
+
if (!error && bytes_recvd > 0)
{
- // inputBuffer_ has bytes_recvd received bytes of data
- try {
- Block element(inputBuffer_, bytes_recvd);
- transport_.receive(element);
- }
+ try
+ {
+ // inputBuffer_ has bytes_recvd received bytes of data
+ if (partialDataSize_ > 0)
+ {
+ size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
+ ndn_memcpy(partialData_ + partialDataSize_, inputBuffer_, newDataSize);
+ partialDataSize_ += newDataSize;
+
+ size_t offset = 0;
+ try
+ {
+ processAll(partialData_, offset, partialDataSize_);
+
+ // no exceptions => processed the whole thing
+ if (bytes_recvd - newDataSize > 0)
+ {
+ // there is a little bit more data available
+
+ offset = 0;
+ partialDataSize_ = bytes_recvd - newDataSize;
+ ndn_memcpy(partialData_, inputBuffer_ + newDataSize, partialDataSize_);
+
+ processAll(partialData_, offset, partialDataSize_);
+
+ // no exceptions => processed the whole thing
+ partialDataSize_ = 0;
+ }
+ else
+ {
+ // done processing
+ partialDataSize_ = 0;
+ }
+ }
+ catch(Block::Error &)
+ {
+ if (offset > 0)
+ {
+ partialDataSize_ -= offset;
+ ndn_memcpy(partialData_, partialData_ + offset, partialDataSize_);
+ }
+ else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
+ {
+ // very bad... should close connection
+ /// @todo Notify somebody
+ socket_.close();
+ }
+ }
+ }
+ else
+ {
+ size_t offset = 0;
+ try
+ {
+ processAll(inputBuffer_, offset, bytes_recvd);
+ }
+ catch(Block::Error &error)
+ {
+ if (offset > 0)
+ {
+ partialDataSize_ = bytes_recvd - offset;
+ ndn_memcpy(partialData_, inputBuffer_ + offset, partialDataSize_);
+ }
+ }
+ }
+ }
catch(Tlv::Error &error)
{
- // pass
- }
- catch(Block::Error &error)
- {
+ std::cerr << "[[handle_async_receive]] Tlv::Error: " << error.what() << std::endl;
// pass
}
}
@@ -83,7 +156,7 @@
void
handle_async_send(const boost::system::error_code& error, const Block &wire)
{
- // pass
+ // pass (needed to keep data block alive during the send)
}
private:
@@ -91,6 +164,9 @@
protocol::socket socket_;
uint8_t inputBuffer_[MAX_LENGTH];
+
+ uint8_t partialData_[MAX_LENGTH];
+ size_t partialDataSize_;
};
UnixTransport::UnixTransport(const std::string &unixSocket/* = "/tmp/.ndnd.sock"*/)