From 9b0d6ded9a711e6cfc36c5d5e7b644de8f609ee2 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Thu, 4 Apr 2024 22:44:58 +0200 Subject: [PATCH] fix: Fixed race condition in action server between is_ready and take_data and execute Some background information: is_ready, take_data and execute data may be called from different threads in any order. The code in the old state expected them to be called in series, without interruption. This lead to multiple race conditions, as the state of the pimpl objects was altered by the three functions in a non thread safe way. Signed-off-by: Janosch Machowinski Signed-off-by: William Woodall # Conflicts: # rclcpp_action/include/rclcpp_action/server.hpp # rclcpp_action/src/client.cpp # rclcpp_action/src/server.cpp # rclcpp_action/test/test_server.cpp --- .../include/rclcpp_action/server.hpp | 14 +- rclcpp_action/src/client.cpp | 370 ++++++++++++------ rclcpp_action/src/server.cpp | 267 +++++++------ rclcpp_action/test/test_server.cpp | 3 + 4 files changed, 409 insertions(+), 245 deletions(-) diff --git a/rclcpp_action/include/rclcpp_action/server.hpp b/rclcpp_action/include/rclcpp_action/server.hpp index 7461de2867..89707980d4 100644 --- a/rclcpp_action/include/rclcpp_action/server.hpp +++ b/rclcpp_action/include/rclcpp_action/server.hpp @@ -22,6 +22,7 @@ #include #include +#include "action_msgs/srv/cancel_goal.hpp" #include "rcl/event_callback.h" #include "rcl_action/action_server.h" #include "rosidl_runtime_c/action_type_support_struct.h" @@ -77,6 +78,7 @@ class ServerBase : public rclcpp::Waitable GoalService, ResultService, CancelService, + Expired, }; RCLCPP_ACTION_PUBLIC @@ -279,19 +281,25 @@ class ServerBase : public rclcpp::Waitable /// \internal RCLCPP_ACTION_PUBLIC void - execute_goal_request_received(const std::shared_ptr & data); + execute_goal_request_received( + rcl_ret_t ret, rcl_action_goal_info_t goal_info, rmw_request_id_t request_header, + std::shared_ptr message); /// Handle a request to cancel goals on the server /// \internal RCLCPP_ACTION_PUBLIC void - execute_cancel_request_received(const std::shared_ptr & data); + execute_cancel_request_received( + rcl_ret_t ret, std::shared_ptr request, + rmw_request_id_t request_header); /// Handle a request to get the result of an action /// \internal RCLCPP_ACTION_PUBLIC void - execute_result_request_received(const std::shared_ptr & data); + execute_result_request_received( + rcl_ret_t ret, std::shared_ptr result_request, + rmw_request_id_t request_header); /// Handle a timeout indicating a completed goal should be forgotten by the server /// \internal diff --git a/rclcpp_action/src/client.cpp b/rclcpp_action/src/client.cpp index fc5b3eeb90..3ea9b1fb1a 100644 --- a/rclcpp_action/src/client.cpp +++ b/rclcpp_action/src/client.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "rcl_action/action_client.h" #include "rcl_action/wait.h" @@ -31,6 +32,67 @@ namespace rclcpp_action { +struct ClientBaseData +{ + struct FeedbackReadyData + { + FeedbackReadyData(rcl_ret_t retIn, std::shared_ptr msg) + : ret(retIn), feedback_message(msg) {} + rcl_ret_t ret; + std::shared_ptr feedback_message; + }; + struct StatusReadyData + { + StatusReadyData(rcl_ret_t retIn, std::shared_ptr msg) + : ret(retIn), status_message(msg) {} + rcl_ret_t ret; + std::shared_ptr status_message; + }; + struct GoalResponseData + { + GoalResponseData(rcl_ret_t retIn, rmw_request_id_t header, std::shared_ptr response) + : ret(retIn), response_header(header), goal_response(response) {} + rcl_ret_t ret; + rmw_request_id_t response_header; + std::shared_ptr goal_response; + }; + struct CancelResponseData + { + CancelResponseData(rcl_ret_t retIn, rmw_request_id_t header, std::shared_ptr response) + : ret(retIn), response_header(header), cancel_response(response) {} + rcl_ret_t ret; + rmw_request_id_t response_header; + std::shared_ptr cancel_response; + }; + struct ResultResponseData + { + ResultResponseData(rcl_ret_t retIn, rmw_request_id_t header, std::shared_ptr response) + : ret(retIn), response_header(header), result_response(response) {} + rcl_ret_t ret; + rmw_request_id_t response_header; + std::shared_ptr result_response; + }; + + std::variant< + FeedbackReadyData, + StatusReadyData, + GoalResponseData, + CancelResponseData, + ResultResponseData + > data; + + explicit ClientBaseData(FeedbackReadyData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(StatusReadyData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(GoalResponseData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(CancelResponseData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(ResultResponseData && data_in) + : data(std::move(data_in)) {} +}; + class ClientBaseImpl { public: @@ -94,11 +156,11 @@ class ClientBaseImpl size_t num_clients{0u}; size_t num_services{0u}; - bool is_feedback_ready{false}; - bool is_status_ready{false}; - bool is_goal_response_ready{false}; - bool is_cancel_response_ready{false}; - bool is_result_response_ready{false}; + // Lock for action_client_ + std::recursive_mutex action_client_mutex_; + + // next ready event for taking, will be set by is_ready and will be processed by take_data + std::atomic next_ready_event; rclcpp::Context::SharedPtr context_; rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_; @@ -142,6 +204,7 @@ bool ClientBase::action_server_is_ready() const { bool is_ready; + std::lock_guard lock(pimpl_->action_client_mutex_); rcl_ret_t ret = rcl_action_server_is_available( this->pimpl_->node_handle.get(), this->pimpl_->client_handle.get(), @@ -255,6 +318,7 @@ ClientBase::get_number_of_ready_services() void ClientBase::add_to_wait_set(rcl_wait_set_t & wait_set) { + std::lock_guard lock(pimpl_->action_client_mutex_); rcl_ret_t ret = rcl_action_wait_set_add_action_client( &wait_set, pimpl_->client_handle.get(), nullptr, nullptr); if (RCL_RET_OK != ret) { @@ -265,24 +329,56 @@ ClientBase::add_to_wait_set(rcl_wait_set_t & wait_set) bool ClientBase::is_ready(const rcl_wait_set_t & wait_set) { - rcl_ret_t ret = rcl_action_client_wait_set_get_entities_ready( - &wait_set, - pimpl_->client_handle.get(), - &pimpl_->is_feedback_ready, - &pimpl_->is_status_ready, - &pimpl_->is_goal_response_ready, - &pimpl_->is_cancel_response_ready, - &pimpl_->is_result_response_ready); - if (RCL_RET_OK != ret) { - rclcpp::exceptions::throw_from_rcl_error( - ret, "failed to check for any ready entities"); + bool is_feedback_ready{false}; + bool is_status_ready{false}; + bool is_goal_response_ready{false}; + bool is_cancel_response_ready{false}; + bool is_result_response_ready{false}; + + rcl_ret_t ret; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + ret = rcl_action_client_wait_set_get_entities_ready( + &wait_set, pimpl_->client_handle.get(), + &is_feedback_ready, + &is_status_ready, + &is_goal_response_ready, + &is_cancel_response_ready, + &is_result_response_ready); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to check for any ready entities"); + } + } + + pimpl_->next_ready_event = std::numeric_limits::max(); + + if (is_feedback_ready) { + pimpl_->next_ready_event = static_cast(EntityType::FeedbackSubscription); + return true; + } + + if (is_status_ready) { + pimpl_->next_ready_event = static_cast(EntityType::StatusSubscription); + return true; + } + + if (is_goal_response_ready) { + pimpl_->next_ready_event = static_cast(EntityType::GoalClient); + return true; + } + + if (is_result_response_ready) { + pimpl_->next_ready_event = static_cast(EntityType::ResultClient); + return true; + } + + if (is_cancel_response_ready) { + pimpl_->next_ready_event = static_cast(EntityType::CancelClient); + return true; } - return - pimpl_->is_feedback_ready || - pimpl_->is_status_ready || - pimpl_->is_goal_response_ready || - pimpl_->is_cancel_response_ready || - pimpl_->is_result_response_ready; + + return false; } void @@ -433,7 +529,6 @@ ClientBase::set_callback_to_entity( } }; - // Set it temporarily to the new callback, while we replace the old one. // This two-step setting, prevents a gap where the old std::function has // been replaced but the middleware hasn't been told about the new one yet. @@ -551,140 +646,155 @@ ClientBase::clear_on_ready_callback() std::shared_ptr ClientBase::take_data() { - if (pimpl_->is_feedback_ready) { - std::shared_ptr feedback_message = this->create_feedback_message(); - rcl_ret_t ret = rcl_action_take_feedback( - pimpl_->client_handle.get(), feedback_message.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, feedback_message)); - } else if (pimpl_->is_status_ready) { - std::shared_ptr status_message = this->create_status_message(); - rcl_ret_t ret = rcl_action_take_status( - pimpl_->client_handle.get(), status_message.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, status_message)); - } else if (pimpl_->is_goal_response_ready) { - rmw_request_id_t response_header; - std::shared_ptr goal_response = this->create_goal_response(); - rcl_ret_t ret = rcl_action_take_goal_response( - pimpl_->client_handle.get(), &response_header, goal_response.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, response_header, goal_response)); - } else if (pimpl_->is_result_response_ready) { - rmw_request_id_t response_header; - std::shared_ptr result_response = this->create_result_response(); - rcl_ret_t ret = rcl_action_take_result_response( - pimpl_->client_handle.get(), &response_header, result_response.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, response_header, result_response)); - } else if (pimpl_->is_cancel_response_ready) { - rmw_request_id_t response_header; - std::shared_ptr cancel_response = this->create_cancel_response(); - rcl_ret_t ret = rcl_action_take_cancel_response( - pimpl_->client_handle.get(), &response_header, cancel_response.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, response_header, cancel_response)); - } else { + // next_ready_event is an atomic, caching localy + size_t next_ready_event = pimpl_->next_ready_event.exchange(std::numeric_limits::max()); + + if (next_ready_event == std::numeric_limits::max()) { throw std::runtime_error("Taking data from action client but nothing is ready"); } + + return take_data_by_entity_id(next_ready_event); } std::shared_ptr ClientBase::take_data_by_entity_id(size_t id) { + std::shared_ptr data_ptr; + rcl_ret_t ret; + // Mark as ready the entity from which we want to take data switch (static_cast(id)) { case EntityType::GoalClient: - pimpl_->is_goal_response_ready = true; + { + rmw_request_id_t response_header; + std::shared_ptr goal_response; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + + goal_response = this->create_goal_response(); + ret = rcl_action_take_goal_response( + pimpl_->client_handle.get(), &response_header, goal_response.get()); + } + data_ptr = std::make_shared( + ClientBaseData::GoalResponseData( + ret, response_header, goal_response)); + } break; case EntityType::ResultClient: - pimpl_->is_result_response_ready = true; + { + rmw_request_id_t response_header; + std::shared_ptr result_response; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + result_response = this->create_result_response(); + ret = rcl_action_take_result_response( + pimpl_->client_handle.get(), &response_header, result_response.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::ResultResponseData( + ret, response_header, result_response)); + } break; case EntityType::CancelClient: - pimpl_->is_cancel_response_ready = true; + { + rmw_request_id_t response_header; + std::shared_ptr cancel_response; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + cancel_response = this->create_cancel_response(); + ret = rcl_action_take_cancel_response( + pimpl_->client_handle.get(), &response_header, cancel_response.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::CancelResponseData( + ret, response_header, cancel_response)); + } break; case EntityType::FeedbackSubscription: - pimpl_->is_feedback_ready = true; + { + std::shared_ptr feedback_message; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + feedback_message = this->create_feedback_message(); + ret = rcl_action_take_feedback( + pimpl_->client_handle.get(), feedback_message.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::FeedbackReadyData( + ret, feedback_message)); + } break; case EntityType::StatusSubscription: - pimpl_->is_status_ready = true; + { + std::shared_ptr status_message; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + status_message = this->create_status_message(); + ret = rcl_action_take_status( + pimpl_->client_handle.get(), status_message.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::StatusReadyData( + ret, status_message)); + } break; } - return take_data(); + return std::static_pointer_cast(data_ptr); } void -ClientBase::execute(const std::shared_ptr & data) +ClientBase::execute(const std::shared_ptr & data_in) { - if (!data) { - throw std::runtime_error("'data' is empty"); + if (!data_in) { + throw std::invalid_argument("'data_in' is unexpectedly empty"); } - if (pimpl_->is_feedback_ready) { - auto shared_ptr = std::static_pointer_cast>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_feedback_ready = false; - if (RCL_RET_OK == ret) { - auto feedback_message = std::get<1>(*shared_ptr); - this->handle_feedback_message(feedback_message); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking feedback"); - } - } else if (pimpl_->is_status_ready) { - auto shared_ptr = std::static_pointer_cast>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_status_ready = false; - if (RCL_RET_OK == ret) { - auto status_message = std::get<1>(*shared_ptr); - this->handle_status_message(status_message); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking status"); - } - } else if (pimpl_->is_goal_response_ready) { - auto shared_ptr = std::static_pointer_cast< - std::tuple>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_goal_response_ready = false; - if (RCL_RET_OK == ret) { - auto response_header = std::get<1>(*shared_ptr); - auto goal_response = std::get<2>(*shared_ptr); - this->handle_goal_response(response_header, goal_response); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking goal response"); - } - } else if (pimpl_->is_result_response_ready) { - auto shared_ptr = std::static_pointer_cast< - std::tuple>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_result_response_ready = false; - if (RCL_RET_OK == ret) { - auto response_header = std::get<1>(*shared_ptr); - auto result_response = std::get<2>(*shared_ptr); - this->handle_result_response(response_header, result_response); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking result response"); - } - } else if (pimpl_->is_cancel_response_ready) { - auto shared_ptr = std::static_pointer_cast< - std::tuple>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_cancel_response_ready = false; - if (RCL_RET_OK == ret) { - auto response_header = std::get<1>(*shared_ptr); - auto cancel_response = std::get<2>(*shared_ptr); - this->handle_cancel_response(response_header, cancel_response); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking cancel response"); - } - } else { - throw std::runtime_error("Executing action client but nothing is ready"); - } + std::shared_ptr data_ptr = std::static_pointer_cast(data_in); + + std::visit( + [&](auto && data) -> void { + using T = std::decay_t; + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_feedback_message(data.feedback_message); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking feedback"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_status_message(data.status_message); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking status"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_goal_response(data.response_header, data.goal_response); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking goal response"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_result_response(data.response_header, data.result_response); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking result response"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_cancel_response(data.response_header, data.cancel_response); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking cancel response"); + } + } + }, data_ptr->data); } } // namespace rclcpp_action diff --git a/rclcpp_action/src/server.cpp b/rclcpp_action/src/server.cpp index 565aaa4f3b..bc3e7e10b8 100644 --- a/rclcpp_action/src/server.cpp +++ b/rclcpp_action/src/server.cpp @@ -18,7 +18,9 @@ #include #include #include +#include #include +#include #include "rcl_action/action_server.h" #include "rcl_action/wait.h" @@ -26,15 +28,42 @@ #include "rcpputils/scope_exit.hpp" #include "action_msgs/msg/goal_status_array.hpp" -#include "action_msgs/srv/cancel_goal.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp_action/server.hpp" using rclcpp_action::ServerBase; using rclcpp_action::GoalUUID; +struct ServerBaseData; + namespace rclcpp_action { + +struct ServerBaseData +{ + using GoalRequestData = std::tuple>; + + using CancelRequestData = std::tuple, + rmw_request_id_t>; + + using ResultRequestData = std::tuple, rmw_request_id_t>; + + using GoalExpiredData = struct Empty {}; + + std::variant data; + + explicit ServerBaseData(GoalRequestData && dataIn) + : data(std::move(dataIn)) {} + explicit ServerBaseData(CancelRequestData && dataIn) + : data(std::move(dataIn)) {} + explicit ServerBaseData(ResultRequestData && dataIn) + : data(std::move(dataIn)) {} + explicit ServerBaseData(GoalExpiredData && dataIn) + : data(std::move(dataIn)) {} +}; + class ServerBaseImpl { public: @@ -60,11 +89,6 @@ class ServerBaseImpl size_t num_services_ = 0; size_t num_guard_conditions_ = 0; - std::atomic goal_request_ready_{false}; - std::atomic cancel_request_ready_{false}; - std::atomic result_request_ready_{false}; - std::atomic goal_expired_{false}; - // Lock for unordered_maps std::recursive_mutex unordered_map_mutex_; @@ -75,10 +99,15 @@ class ServerBaseImpl // rcl goal handles are kept so api to send result doesn't try to access freed memory std::unordered_map> goal_handles_; + // next ready event for taking, will be set by is_ready and will be processed by take_data + std::atomic next_ready_event; + rclcpp::Logger logger_; }; + } // namespace rclcpp_action + ServerBase::ServerBase( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base, rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock, @@ -194,124 +223,155 @@ ServerBase::is_ready(const rcl_wait_set_t & wait_set) &goal_expired); } - pimpl_->goal_request_ready_ = goal_request_ready; - pimpl_->cancel_request_ready_ = cancel_request_ready; - pimpl_->result_request_ready_ = result_request_ready; - pimpl_->goal_expired_ = goal_expired; - if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - return pimpl_->goal_request_ready_.load() || - pimpl_->cancel_request_ready_.load() || - pimpl_->result_request_ready_.load() || - pimpl_->goal_expired_.load(); -} + pimpl_->next_ready_event = std::numeric_limits::max(); -std::shared_ptr -ServerBase::take_data() -{ - if (pimpl_->goal_request_ready_.load()) { - rcl_ret_t ret; - rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info(); - rmw_request_id_t request_header; + if (goal_request_ready) { + pimpl_->next_ready_event = static_cast(EntityType::GoalService); + return true; + } - std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + if (cancel_request_ready) { + pimpl_->next_ready_event = static_cast(EntityType::CancelService); + return true; + } - std::shared_ptr message = create_goal_request(); - ret = rcl_action_take_goal_request( - pimpl_->action_server_.get(), - &request_header, - message.get()); - - return std::static_pointer_cast( - std::make_shared - >>( - ret, - goal_info, - request_header, message)); - } else if (pimpl_->cancel_request_ready_.load()) { - rcl_ret_t ret; - rmw_request_id_t request_header; + if (result_request_ready) { + pimpl_->next_ready_event = static_cast(EntityType::ResultService); + return true; + } - // Initialize cancel request - auto request = std::make_shared(); + if (goal_expired) { + pimpl_->next_ready_event = static_cast(EntityType::Expired); + return true; + } - std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); - ret = rcl_action_take_cancel_request( - pimpl_->action_server_.get(), - &request_header, - request.get()); + return false; +} - return std::static_pointer_cast( - std::make_shared - , - rmw_request_id_t>>(ret, request, request_header)); - } else if (pimpl_->result_request_ready_.load()) { - rcl_ret_t ret; - // Get the result request message - rmw_request_id_t request_header; - std::shared_ptr result_request = create_result_request(); - std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); - ret = rcl_action_take_result_request( - pimpl_->action_server_.get(), &request_header, result_request.get()); - - return std::static_pointer_cast( - std::make_shared, rmw_request_id_t>>( - ret, result_request, request_header)); - } else if (pimpl_->goal_expired_.load()) { - return nullptr; - } else { - throw std::runtime_error("Taking data from action server but nothing is ready"); +std::shared_ptr +ServerBase::take_data() +{ + size_t next_ready_event = pimpl_->next_ready_event.exchange(std::numeric_limits::max()); + + if (next_ready_event == std::numeric_limits::max()) { + throw std::runtime_error("ServerBase::take_data() called but no data is ready"); } + + return take_data_by_entity_id(next_ready_event); } std::shared_ptr ServerBase::take_data_by_entity_id(size_t id) { + std::shared_ptr data_ptr; // Mark as ready the entity from which we want to take data switch (static_cast(id)) { case EntityType::GoalService: - pimpl_->goal_request_ready_ = true; + { + rcl_ret_t ret; + rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info(); + rmw_request_id_t request_header; + + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + + std::shared_ptr message = create_goal_request(); + ret = rcl_action_take_goal_request( + pimpl_->action_server_.get(), + &request_header, + message.get()); + + data_ptr = std::make_shared( + ServerBaseData::GoalRequestData( + ret, goal_info, request_header, message)); + } break; case EntityType::ResultService: - pimpl_->result_request_ready_ = true; + { + rcl_ret_t ret; + // Get the result request message + rmw_request_id_t request_header; + std::shared_ptr result_request = create_result_request(); + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_take_result_request( + pimpl_->action_server_.get(), &request_header, result_request.get()); + + data_ptr = + std::make_shared( + ServerBaseData::ResultRequestData( + ret, result_request, request_header)); + } break; case EntityType::CancelService: - pimpl_->cancel_request_ready_ = true; + { + rcl_ret_t ret; + rmw_request_id_t request_header; + + // Initialize cancel request + auto request = std::make_shared(); + + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_take_cancel_request( + pimpl_->action_server_.get(), + &request_header, + request.get()); + + data_ptr = + std::make_shared( + ServerBaseData::CancelRequestData( + ret, request, + request_header)); + } + break; + case EntityType::Expired: + { + data_ptr = + std::make_shared(ServerBaseData::GoalExpiredData()); + } break; } - return take_data(); + return std::static_pointer_cast(data_ptr); } void -ServerBase::execute(const std::shared_ptr & data) +ServerBase::execute(const std::shared_ptr & dataIn) { - if (!data && !pimpl_->goal_expired_.load()) { - throw std::runtime_error("'data' is empty"); - } - - if (pimpl_->goal_request_ready_.load()) { - execute_goal_request_received(data); - } else if (pimpl_->cancel_request_ready_.load()) { - execute_cancel_request_received(data); - } else if (pimpl_->result_request_ready_.load()) { - execute_result_request_received(data); - } else if (pimpl_->goal_expired_.load()) { - execute_check_expired_goals(); - } else { - throw std::runtime_error("Executing action server but nothing is ready"); + if (!dataIn) { + throw std::runtime_error("ServerBase::execute: give data pointer was null"); } + + std::shared_ptr dataPtr = std::static_pointer_cast(dataIn); + + std::visit( + [&](auto && data) -> void { + using T = std::decay_t; + if constexpr (std::is_same_v) { + execute_goal_request_received( + std::get<0>(data), std::get<1>(data), std::get<2>(data), + std::get<3>(data)); + } + if constexpr (std::is_same_v) { + execute_cancel_request_received(std::get<0>(data), std::get<1>(data), std::get<2>(data)); + } + if constexpr (std::is_same_v) { + execute_result_request_received(std::get<0>(data), std::get<1>(data), std::get<2>(data)); + } + if constexpr (std::is_same_v) { + execute_check_expired_goals(); + } + }, + dataPtr->data); } void -ServerBase::execute_goal_request_received(const std::shared_ptr & data) +ServerBase::execute_goal_request_received( + rcl_ret_t ret, rcl_action_goal_info_t goal_info, rmw_request_id_t request_header, + const std::shared_ptr message) { - auto shared_ptr = std::static_pointer_cast - >>(data); - rcl_ret_t ret = std::get<0>(*shared_ptr); if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { // Ignore take failure because connext fails if it receives a sample without valid data. // This happens when a client shuts down and connext receives a sample saying the client is @@ -320,14 +380,6 @@ ServerBase::execute_goal_request_received(const std::shared_ptr & data) } else if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - rcl_action_goal_info_t goal_info = std::get<1>(*shared_ptr); - rmw_request_id_t request_header = std::get<2>(*shared_ptr); - std::shared_ptr message = std::get<3>(*shared_ptr); - - bool expected = true; - if (!pimpl_->goal_request_ready_.compare_exchange_strong(expected, false)) { - return; - } GoalUUID uuid = get_goal_id_from_goal_request(message.get()); convert(uuid, &goal_info); @@ -408,12 +460,10 @@ ServerBase::execute_goal_request_received(const std::shared_ptr & data) } void -ServerBase::execute_cancel_request_received(const std::shared_ptr & data) +ServerBase::execute_cancel_request_received( + rcl_ret_t ret, std::shared_ptr request, + rmw_request_id_t request_header) { - auto shared_ptr = std::static_pointer_cast - , - rmw_request_id_t>>(data); - auto ret = std::get<0>(*shared_ptr); if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { // Ignore take failure because connext fails if it receives a sample without valid data. // This happens when a client shuts down and connext receives a sample saying the client is @@ -422,9 +472,6 @@ ServerBase::execute_cancel_request_received(const std::shared_ptr & data) } else if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - auto request = std::get<1>(*shared_ptr); - auto request_header = std::get<2>(*shared_ptr); - pimpl_->cancel_request_ready_ = false; // Convert c++ message to C message rcl_action_cancel_request_t cancel_request = rcl_action_get_zero_initialized_cancel_request(); @@ -506,11 +553,10 @@ ServerBase::execute_cancel_request_received(const std::shared_ptr & data) } void -ServerBase::execute_result_request_received(const std::shared_ptr & data) +ServerBase::execute_result_request_received( + rcl_ret_t ret, std::shared_ptr result_request, + rmw_request_id_t request_header) { - auto shared_ptr = std::static_pointer_cast - , rmw_request_id_t>>(data); - auto ret = std::get<0>(*shared_ptr); if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { // Ignore take failure because connext fails if it receives a sample without valid data. // This happens when a client shuts down and connext receives a sample saying the client is @@ -519,10 +565,7 @@ ServerBase::execute_result_request_received(const std::shared_ptr & data) } else if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - auto result_request = std::get<1>(*shared_ptr); - auto request_header = std::get<2>(*shared_ptr); - pimpl_->result_request_ready_ = false; std::shared_ptr result_response; // check if the goal exists diff --git a/rclcpp_action/test/test_server.cpp b/rclcpp_action/test/test_server.cpp index 9ffa31797f..c63d1dfdfc 100644 --- a/rclcpp_action/test/test_server.cpp +++ b/rclcpp_action/test/test_server.cpp @@ -1110,6 +1110,9 @@ TEST_F(TestGoalRequestServer, is_ready_rcl_error) { EXPECT_NO_THROW(action_server_->add_to_wait_set(wait_set)); EXPECT_TRUE(action_server_->is_ready(wait_set)); + + EXPECT_NO_THROW(action_server_->take_data()); + auto mock = mocking_utils::patch_and_return( "lib:rclcpp_action", rcl_action_server_wait_set_get_entities_ready, RCL_RET_ERROR); EXPECT_THROW(action_server_->is_ready(wait_set), rclcpp::exceptions::RCLError);