Skip to content

Commit

Permalink
Fixes "Taking data from the action action but nothing is ready" excep…
Browse files Browse the repository at this point in the history
…tion (#1)

* [rclcpp_action] fix race condition between take_data() and execute()
* [rclcpp] skip adding waitable to triggered list if it's already in it
  • Loading branch information
medvedevigorek committed Nov 11, 2023
1 parent 45df355 commit c364c3c
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,13 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}
for (size_t i = 0; i < waitable_handles_.size(); ++i) {
if (waitable_handles_[i]->is_ready(wait_set)) {
waitable_triggered_handles_.emplace_back(std::move(waitable_handles_[i]));
auto it = std::find(
waitable_triggered_handles_.begin(),
waitable_triggered_handles_.end(),
waitable_handles_[i]);
if (it == waitable_triggered_handles_.end()) {
waitable_triggered_handles_.emplace_back(std::move(waitable_handles_[i]));
}
}
}

Expand Down
118 changes: 46 additions & 72 deletions rclcpp_action/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ class ClientBaseImpl
std::default_random_engine, 8, unsigned int> random_bytes_generator;
};

using DataMessage = std::tuple<rcl_ret_t, ClientBase::EntityType, std::shared_ptr<void>>;
using ResponseMessage = std::tuple<rmw_request_id_t, std::shared_ptr<void>>;

ClientBase::ClientBase(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
Expand Down Expand Up @@ -552,42 +555,47 @@ ClientBase::take_data()
{
if (pimpl_->is_feedback_ready) {
std::shared_ptr<void> feedback_message = this->create_feedback_message();
rcl_ret_t ret = rcl_action_take_feedback(
pimpl_->client_handle.get(), feedback_message.get());
rcl_ret_t ret = rcl_action_take_feedback(pimpl_->client_handle.get(), feedback_message.get());
pimpl_->is_feedback_ready = false;
return std::static_pointer_cast<void>(
std::make_shared<std::tuple<rcl_ret_t, std::shared_ptr<void>>>(
ret, feedback_message));
std::make_shared<DataMessage>(ret, EntityType::FeedbackSubscription, feedback_message));
} else if (pimpl_->is_status_ready) {
std::shared_ptr<void> status_message = this->create_status_message();
rcl_ret_t ret = rcl_action_take_status(
pimpl_->client_handle.get(), status_message.get());
rcl_ret_t ret = rcl_action_take_status(pimpl_->client_handle.get(), status_message.get());
pimpl_->is_status_ready = false;
return std::static_pointer_cast<void>(
std::make_shared<std::tuple<rcl_ret_t, std::shared_ptr<void>>>(
ret, status_message));
std::make_shared<DataMessage>(ret, EntityType::StatusSubscription, status_message));
} else if (pimpl_->is_goal_response_ready) {
rmw_request_id_t response_header;
std::shared_ptr<void> goal_response = this->create_goal_response();
rcl_ret_t ret = rcl_action_take_goal_response(
pimpl_->client_handle.get(), &response_header, goal_response.get());
pimpl_->is_goal_response_ready = false;
return std::static_pointer_cast<void>(
std::make_shared<std::tuple<rcl_ret_t, rmw_request_id_t, std::shared_ptr<void>>>(
ret, response_header, goal_response));
std::make_shared<DataMessage>(
ret, EntityType::GoalClient,
std::make_shared<ResponseMessage>(response_header, goal_response)));
} else if (pimpl_->is_result_response_ready) {
rmw_request_id_t response_header;
std::shared_ptr<void> result_response = this->create_result_response();
rcl_ret_t ret = rcl_action_take_result_response(
pimpl_->client_handle.get(), &response_header, result_response.get());
pimpl_->client_handle.get(),
&response_header, result_response.get());
pimpl_->is_result_response_ready = false;
return std::static_pointer_cast<void>(
std::make_shared<std::tuple<rcl_ret_t, rmw_request_id_t, std::shared_ptr<void>>>(
ret, response_header, result_response));
std::make_shared<DataMessage>(
ret, EntityType::ResultClient,
std::make_shared<ResponseMessage>(response_header, result_response)));
} else if (pimpl_->is_cancel_response_ready) {
rmw_request_id_t response_header;
std::shared_ptr<void> cancel_response = this->create_cancel_response();
rcl_ret_t ret = rcl_action_take_cancel_response(
pimpl_->client_handle.get(), &response_header, cancel_response.get());
pimpl_->is_cancel_response_ready = false;
return std::static_pointer_cast<void>(
std::make_shared<std::tuple<rcl_ret_t, rmw_request_id_t, std::shared_ptr<void>>>(
ret, response_header, cancel_response));
std::make_shared<DataMessage>(
ret, EntityType::CancelClient,
std::make_shared<ResponseMessage>(response_header, cancel_response)));
} else {
throw std::runtime_error("Taking data from action client but nothing is ready");
}
Expand Down Expand Up @@ -625,64 +633,30 @@ ClientBase::execute(std::shared_ptr<void> & data)
throw std::runtime_error("'data' is empty");
}

if (pimpl_->is_feedback_ready) {
auto shared_ptr = std::static_pointer_cast<std::tuple<rcl_ret_t, std::shared_ptr<void>>>(data);
auto ret = std::get<0>(*shared_ptr);
pimpl_->is_feedback_ready = false;
if (RCL_RET_OK == ret) {
auto feedback_message = std::get<1>(*shared_ptr);
this->handle_feedback_message(feedback_message);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error taking feedback");
}
} else if (pimpl_->is_status_ready) {
auto shared_ptr = std::static_pointer_cast<std::tuple<rcl_ret_t, std::shared_ptr<void>>>(data);
auto ret = std::get<0>(*shared_ptr);
pimpl_->is_status_ready = false;
if (RCL_RET_OK == ret) {
auto status_message = std::get<1>(*shared_ptr);
this->handle_status_message(status_message);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error taking status");
}
} else if (pimpl_->is_goal_response_ready) {
auto shared_ptr = std::static_pointer_cast<
std::tuple<rcl_ret_t, rmw_request_id_t, std::shared_ptr<void>>>(data);
auto ret = std::get<0>(*shared_ptr);
pimpl_->is_goal_response_ready = false;
if (RCL_RET_OK == ret) {
auto response_header = std::get<1>(*shared_ptr);
auto goal_response = std::get<2>(*shared_ptr);
this->handle_goal_response(response_header, goal_response);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error taking goal response");
}
} else if (pimpl_->is_result_response_ready) {
auto shared_ptr = std::static_pointer_cast<
std::tuple<rcl_ret_t, rmw_request_id_t, std::shared_ptr<void>>>(data);
auto ret = std::get<0>(*shared_ptr);
pimpl_->is_result_response_ready = false;
if (RCL_RET_OK == ret) {
auto response_header = std::get<1>(*shared_ptr);
auto result_response = std::get<2>(*shared_ptr);
this->handle_result_response(response_header, result_response);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error taking result response");
}
} else if (pimpl_->is_cancel_response_ready) {
auto shared_ptr = std::static_pointer_cast<
std::tuple<rcl_ret_t, rmw_request_id_t, std::shared_ptr<void>>>(data);
auto ret = std::get<0>(*shared_ptr);
pimpl_->is_cancel_response_ready = false;
if (RCL_RET_OK == ret) {
auto response_header = std::get<1>(*shared_ptr);
auto cancel_response = std::get<2>(*shared_ptr);
this->handle_cancel_response(response_header, cancel_response);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error taking cancel response");
}
auto data_ptr = std::static_pointer_cast<DataMessage>(data);
auto ret = std::get<0>(*data_ptr);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "execute is called on invalid data");
}

auto type = std::get<1>(*data_ptr);
if (EntityType::FeedbackSubscription == type) {
auto message = std::get<2>(*data_ptr);
handle_feedback_message(message);
} else if (EntityType::StatusSubscription == type) {
auto message = std::get<2>(*data_ptr);
this->handle_status_message(message);
} else if (EntityType::GoalClient == type) {
auto message = std::static_pointer_cast<ResponseMessage>(std::get<2>(*data_ptr));
handle_goal_response(std::get<0>(*message), std::get<1>(*message));
} else if (EntityType::ResultClient == type) {
auto message = std::static_pointer_cast<ResponseMessage>(std::get<2>(*data_ptr));
handle_result_response(std::get<0>(*message), std::get<1>(*message));
} else if (EntityType::CancelClient == type) {
auto message = std::static_pointer_cast<ResponseMessage>(std::get<2>(*data_ptr));
handle_cancel_response(std::get<0>(*message), std::get<1>(*message));
} else {
throw std::runtime_error("Executing action client but nothing is ready");
throw std::logic_error("Unknown entity type");
}
}

Expand Down

0 comments on commit c364c3c

Please sign in to comment.