face+transport: Cleanup and preparation for implementation of fully async Face operations
Change-Id: I7816b6a9c99a0cf4825459b9652372d6585a5191
diff --git a/src/transport/unix-transport.cpp b/src/transport/unix-transport.cpp
index 603f5e3..811e0d4 100644
--- a/src/transport/unix-transport.cpp
+++ b/src/transport/unix-transport.cpp
@@ -15,7 +15,7 @@
#include <boost/bind.hpp>
using namespace std;
-typedef boost::asio::local::datagram_protocol protocol;
+typedef boost::asio::local::stream_protocol protocol;
namespace ndn {
@@ -24,67 +24,77 @@
class UnixTransport::Impl
{
public:
- Impl() : socket_(io_)
+ Impl(UnixTransport &transport)
+ : transport_(transport),
+ socket_(*transport_.ioService_)
{
}
- bool
- connect(const std::string &unixSocket, ElementListener& elementListener)
+ void
+ connect()
{
socket_.open();
- socket_.connect(protocol::endpoint(unixSocket));
+ socket_.connect(protocol::endpoint(transport_.unixSocket_));
// socket_.async_connect(protocol::endpoint(unixSocket));
socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
boost::bind(&Impl::handle_async_receive, this, _1, _2));
-
- return true;
}
void
- send(const uint8_t *data, size_t dataLength)
- {
- socket_.send(boost::asio::buffer(data, dataLength));
- }
-
- void
- processEvents()
- {
- io_.poll();
- // from boost docs:
- // The poll() function runs handlers that are ready to run, without blocking, until the io_service has been stopped or there are no more ready handlers.
- }
-
- void
- handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
- {
- if (!error && bytes_recvd > 0)
- {
- // inputBuffer_ has bytes_recvd received bytes of data
- }
-
- socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
- boost::bind(&Impl::handle_async_receive, this, _1, _2));
- }
-
- void
close()
{
socket_.close();
}
+
+ void
+ send(const Block &wire)
+ {
+ socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
+ boost::bind(&Impl::handle_async_send, this, _1, wire));
+ }
+
+ void
+ handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
+ {
+ /// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
+
+ if (!error && bytes_recvd > 0)
+ {
+ // inputBuffer_ has bytes_recvd received bytes of data
+ try {
+ Block element(inputBuffer_, bytes_recvd);
+ transport_.receive(element);
+ }
+ catch(Tlv::Error &error)
+ {
+ // pass
+ }
+ catch(Block::Error &error)
+ {
+ // pass
+ }
+ }
+
+ socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
+ boost::bind(&Impl::handle_async_receive, this, _1, _2));
+ }
+
+ void
+ handle_async_send(const boost::system::error_code& error, const Block &wire)
+ {
+ // pass
+ }
private:
- boost::asio::io_service io_;
-
+ UnixTransport &transport_;
+
protocol::socket socket_;
-
uint8_t inputBuffer_[MAX_LENGTH];
};
UnixTransport::UnixTransport(const std::string &unixSocket/* = "/tmp/.ndnd.sock"*/)
: unixSocket_(unixSocket)
- , isConnected_(false)
- , impl_(new UnixTransport::Impl())
{
}
@@ -93,30 +103,20 @@
}
void
-UnixTransport::connect(ElementListener& elementListener)
+UnixTransport::connect(boost::asio::io_service &ioService, const ReceiveCallback &receiveCallback)
{
- if (impl_->connect(unixSocket_, elementListener))
- {
- isConnected_ = true;
- }
+ Transport::connect(ioService, receiveCallback);
+
+ impl_ = std::auto_ptr<UnixTransport::Impl> (new UnixTransport::Impl(*this));
+ impl_->connect();
+
+ isConnected_ = true;
}
void
-UnixTransport::send(const uint8_t *data, size_t dataLength)
+UnixTransport::send(const Block &wire)
{
- impl_->send(data, dataLength);
-}
-
-void
-UnixTransport::processEvents()
-{
- impl_->processEvents();
-}
-
-bool
-UnixTransport::getIsConnected()
-{
- return isConnected_;
+ impl_->send(wire);
}
void