/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
 * Copyright (c) 2014-2024,  Regents of the University of California,
 *                           Arizona Board of Regents,
 *                           Colorado State University,
 *                           University Pierre & Marie Curie, Sorbonne University,
 *                           Washington University in St. Louis,
 *                           Beijing Institute of Technology,
 *                           The University of Memphis.
 *
 * This file is part of NFD (Named Data Networking Forwarding Daemon).
 * See AUTHORS.md for complete list of NFD authors and contributors.
 *
 * NFD is free software: you can redistribute it and/or modify it under the terms
 * of the GNU General Public License as published by the Free Software Foundation,
 * either version 3 of the License, or (at your option) any later version.
 *
 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
 * PURPOSE.  See the GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along with
 * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
#define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP

#include "transport.hpp"
#include "socket-utils.hpp"
#include "common/global.hpp"

#include <array>
#include <queue>

#include <boost/asio/defer.hpp>
#include <boost/asio/write.hpp>

namespace nfd::face {

/**
 * \brief Implements a Transport for stream-based protocols.
 *
 * \tparam Protocol a stream-based protocol in Boost.Asio
 */
template<class Protocol>
class StreamTransport : public Transport
{
public:
  using protocol = Protocol;

  /**
   * \brief Construct stream transport.
   *
   * \param socket Protocol-specific socket for the created transport
   */
  explicit
  StreamTransport(typename protocol::socket&& socket);

  ssize_t
  getSendQueueLength() override;

protected:
  void
  doClose() override;

  void
  deferredClose();

  void
  doSend(const Block& packet) override;

  void
  sendFromQueue();

  void
  handleSend(const boost::system::error_code& error,
             size_t nBytesSent);

  void
  startReceive();

  void
  handleReceive(const boost::system::error_code& error,
                size_t nBytesReceived);

  void
  processErrorCode(const boost::system::error_code& error);

  virtual void
  handleError(const boost::system::error_code& error);

  void
  resetReceiveBuffer();

  void
  resetSendQueue();

  size_t
  getSendQueueBytes() const;

protected:
  typename protocol::socket m_socket;

  NFD_LOG_MEMBER_DECL();

private:
  size_t m_sendQueueBytes = 0;
  std::queue<Block> m_sendQueue;
  size_t m_receiveBufferSize = 0;
  std::array<uint8_t, ndn::MAX_NDN_PACKET_SIZE> m_receiveBuffer;
};


template<class T>
StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
  : m_socket(std::move(socket))
{
  // No queue capacity is set because there is no theoretical limit to the size of m_sendQueue.
  // Therefore, protecting against send queue overflows is less critical than in other transport
  // types. Instead, we use the default threshold specified in the GenericLinkService options.

  startReceive();
}

template<class T>
ssize_t
StreamTransport<T>::getSendQueueLength()
{
  ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
  if (queueLength == QUEUE_ERROR) {
    NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
  }
  return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
}

template<class T>
void
StreamTransport<T>::doClose()
{
  NFD_LOG_FACE_TRACE(__func__);

  if (m_socket.is_open()) {
    // Cancel all outstanding operations and shutdown the socket
    // so that no further sends or receives are possible.
    // Use the non-throwing variants and ignore errors, if any.
    boost::system::error_code error;
    m_socket.cancel(error);
    m_socket.shutdown(boost::asio::socket_base::shutdown_both, error);
  }

  // Ensure that the Transport stays alive at least until
  // all pending handlers are dispatched
  boost::asio::defer(getGlobalIoService(), [this] { deferredClose(); });

  // Some bug or feature of Boost.Asio (see https://redmine.named-data.net/issues/1856):
  //
  // When doClose is called from a socket event handler (e.g., from handleReceive),
  // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
  // Instead, handleSend is invoked as nothing bad happened.
  //
  // In order to prevent the assertion in handleSend from failing, we clear the queue
  // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
  // this point have been executed.  If more send operations are scheduled after this
  // point, they will fail because the socket has been shutdown, and their callbacks
  // will be invoked with error code == asio::error::shut_down.
}

template<class T>
void
StreamTransport<T>::deferredClose()
{
  NFD_LOG_FACE_TRACE(__func__);

  resetSendQueue();

  // use the non-throwing variant and ignore errors, if any
  boost::system::error_code error;
  m_socket.close(error);

  this->setState(TransportState::CLOSED);
}

template<class T>
void
StreamTransport<T>::doSend(const Block& packet)
{
  NFD_LOG_FACE_TRACE(__func__);

  if (getState() != TransportState::UP)
    return;

  bool wasQueueEmpty = m_sendQueue.empty();
  m_sendQueue.push(packet);
  m_sendQueueBytes += packet.size();

  if (wasQueueEmpty)
    sendFromQueue();
}

template<class T>
void
StreamTransport<T>::sendFromQueue()
{
  boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
                           [this] (auto&&... args) { this->handleSend(std::forward<decltype(args)>(args)...); });
}

template<class T>
void
StreamTransport<T>::handleSend(const boost::system::error_code& error,
                               size_t nBytesSent)
{
  if (error)
    return processErrorCode(error);

  NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");

  BOOST_ASSERT(!m_sendQueue.empty());
  BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
  m_sendQueueBytes -= nBytesSent;
  m_sendQueue.pop();

  if (!m_sendQueue.empty())
    sendFromQueue();
}

template<class T>
void
StreamTransport<T>::startReceive()
{
  BOOST_ASSERT(getState() == TransportState::UP);

  m_socket.async_receive(boost::asio::buffer(m_receiveBuffer.data() + m_receiveBufferSize,
                                             m_receiveBuffer.size() - m_receiveBufferSize),
                         [this] (auto&&... args) { this->handleReceive(std::forward<decltype(args)>(args)...); });
}

template<class T>
void
StreamTransport<T>::handleReceive(const boost::system::error_code& error,
                                  size_t nBytesReceived)
{
  if (error)
    return processErrorCode(error);

  NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");

  m_receiveBufferSize += nBytesReceived;
  auto unparsedBytes = ndn::make_span(m_receiveBuffer).first(m_receiveBufferSize);
  while (!unparsedBytes.empty()) {
    auto [isOk, element] = Block::fromBuffer(unparsedBytes);
    if (!isOk)
      break;

    unparsedBytes = unparsedBytes.subspan(element.size());
    this->receive(element);
  }

  if (unparsedBytes.empty()) {
    // nothing left in the receive buffer
    m_receiveBufferSize = 0;
  }
  else if (unparsedBytes.data() != m_receiveBuffer.data()) {
    // move remaining unparsed bytes to the beginning of the receive buffer
    std::copy(unparsedBytes.begin(), unparsedBytes.end(), m_receiveBuffer.begin());
    m_receiveBufferSize = unparsedBytes.size();
  }
  else if (unparsedBytes.size() == m_receiveBuffer.size()) {
    NFD_LOG_FACE_ERROR("Failed to parse incoming packet or packet too large to process");
    this->setState(TransportState::FAILED);
    doClose();
    return;
  }

  startReceive();
}

template<class T>
void
StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
{
  NFD_LOG_FACE_TRACE(__func__);

  if (getState() == TransportState::CLOSING ||
      getState() == TransportState::FAILED ||
      getState() == TransportState::CLOSED ||
      error == boost::asio::error::operation_aborted || // when cancel() is called
      error == boost::asio::error::shut_down)           // after shutdown() is called
    // transport is shutting down, ignore any errors
    return;

  handleError(error);
}

template<class T>
void
StreamTransport<T>::handleError(const boost::system::error_code& error)
{
  if (error == boost::asio::error::eof) {
    this->setState(TransportState::CLOSING);
  }
  else {
    NFD_LOG_FACE_ERROR("Send or receive operation failed: " << error.message());
    this->setState(TransportState::FAILED);
  }
  doClose();
}

template<class T>
void
StreamTransport<T>::resetReceiveBuffer()
{
  m_receiveBufferSize = 0;
}

template<class T>
void
StreamTransport<T>::resetSendQueue()
{
  std::queue<Block> emptyQueue;
  std::swap(emptyQueue, m_sendQueue);
  m_sendQueueBytes = 0;
}

template<class T>
size_t
StreamTransport<T>::getSendQueueBytes() const
{
  return m_sendQueueBytes;
}

} // namespace nfd::face

#endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
