Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

Consider updating asio-grpc #439

Open
Tradias opened this issue Jul 30, 2023 · 0 comments
Open

Consider updating asio-grpc #439

Tradias opened this issue Jul 30, 2023 · 0 comments

Comments

@Tradias
Copy link

Tradias commented Jul 30, 2023

I generally try my best to regularly update asio-grpc upstream: cpp-pm/hunter#686

With the latest changes in version 2.6.0, we could greatly simplify some implementation. For example ServerStreamingRpc:

  • No more dangerous thread_local error_code handling
  • No more Dispatcher needed
  • No more read_failed member needed, decision is now made at compile time
namespace detail
{
class GrpcStatusException final : public std::exception
{
  public:
    // Consider including status.error_code in the error message
    explicit GrpcStatusException(const grpc::Status& status) : error_message(status.error_message()) {}

    const char* what() const final { return error_message.c_str(); }

  private:
    std::string error_message;
};

std::exception_ptr to_exception_ptr(const grpc::Status& status)
{
    return std::make_exception_ptr(GrpcStatusException{status});
}

std::exception_ptr to_exception_ptr_if_not_ok(const grpc::Status& status)
{
    if (status.ok())
    {
        return {};
    }
    return detail::to_exception_ptr(status);
}
}  // namespace detail


template <auto PrepareAsync>
class ServerStreamingRpc : public agrpc::ClientRPC<PrepareAsync>
{
  private:
    using Base = agrpc::ClientRPC<PrepareAsync>;
    using Stub = typename Base::Stub;
    using Request = typename Base::Request;
    using Reply = typename Base::Response;

    struct Start
    {
        ServerStreamingRpc& self_;
        Stub& stub_;
        const Request& request_;

        template <typename Op>
        void operator()(Op& op)
        {
            self_.Base::start(stub_, request_, std::move(op));
        }

        template <typename Op>
        void operator()(Op& op, bool ok)
        {
            if (ok)
            {
                op.complete({});
            }
            else
            {
                self_.Base::finish(std::move(op));
            }
        }

        template <typename Op>
        void operator()(Op& op, grpc::Status status)
        {
            op.complete(detail::to_exception_ptr_if_not_ok(status));
        }
    };

    struct Read
    {
        ServerStreamingRpc& self_;

        template <typename Op>
        void operator()(Op& op)
        {
            // This technically leads to use-after-move but protobuf message types are in a well-defined state after
            // move (as if default constructed).
            self_.Base::read(self_.reply_, std::move(op));
        }

        template <typename Op>
        void operator()(Op& op, bool ok)
        {
            if (ok)
            {
                op.complete({}, std::move(self_.reply_));
            }
            else
            {
                self_.Base::finish(std::move(op));
            }
        }

        template <typename Op>
        void operator()(Op& op, grpc::Status status)
        {
            // We should consider returning the reply by optional reference to improve memory reuse and make it easier
            // for the caller to handle OK termination of the stream by the server.
            if (status.ok())
            {
                status = grpc::Status{grpc::StatusCode::ABORTED, "operation closed by server"};
            }
            op.complete(detail::to_exception_ptr(status), {});
        }
    };

  public:
    explicit ServerStreamingRpc(agrpc::GrpcContext& grpc_context) : Base(grpc_context) {}

    template <typename CompletionToken = asio::use_awaitable_t<>>
    auto start(Stub& stub, const Request& request, CompletionToken&& token = {})
    {
        return boost::asio::async_compose<CompletionToken, void(std::exception_ptr)>(Start{*this, stub, request},
                                                                                     token);
    }

    template <typename CompletionToken = asio::use_awaitable_t<>>
    auto read(CompletionToken&& token = {})
    {
        return boost::asio::async_compose<CompletionToken, void(std::exception_ptr, Reply)>(Read{*this}, token);
    }

   // request_on and read_on are no longer needed, because agrpc::ClientRPC behaves more like a typical asio IoObject, which
   // means it will post to the completion handler's executor before completion. In the case of asio::awaitable the would be the 
   // one passed to co_spawn. But the old behavior can be obtained by passing
   // `asio::bind_executor(grpc_context, asio::use_awaitable)
   // or even `asio::bind_executor(asio::system_executor{}, asio::use_awaitable)
   // as completion token

  private:
    using Base::finish;
    using Base::read_initial_metadata;

    Reply reply_;
};
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant