diff --git a/rcl/include/rcl/macros.h b/rcl/include/rcl/macros.h index 4df9ff427..973dec116 100644 --- a/rcl/include/rcl/macros.h +++ b/rcl/include/rcl/macros.h @@ -27,6 +27,37 @@ extern "C" #define RCL_UNUSED(x) RCUTILS_UNUSED(x) +#define RCL_RET_FROM_RCUTIL_RET(rcl_ret_var, rcutils_expr) \ + { \ + rcutils_ret_t rcutils_ret = rcutils_expr; \ + if (RCUTILS_RET_OK != rcutils_ret) { \ + if (rcutils_error_is_set()) { \ + RCL_SET_ERROR_MSG(rcutils_get_error_string().str); \ + } else { \ + RCL_SET_ERROR_MSG_WITH_FORMAT_STRING("rcutils_ret_t code: %i", rcutils_ret); \ + } \ + } \ + switch (rcutils_ret) { \ + case RCUTILS_RET_OK: \ + rcl_ret_var = RCL_RET_OK; \ + break; \ + case RCUTILS_RET_ERROR: \ + rcl_ret_var = RCL_RET_ERROR; \ + break; \ + case RCUTILS_RET_BAD_ALLOC: \ + rcl_ret_var = RCL_RET_BAD_ALLOC; \ + break; \ + case RCUTILS_RET_INVALID_ARGUMENT: \ + rcl_ret_var = RCL_RET_INVALID_ARGUMENT; \ + break; \ + case RCUTILS_RET_NOT_INITIALIZED: \ + rcl_ret_var = RCL_RET_NOT_INIT; \ + break; \ + default: \ + rcl_ret_var = RCUTILS_RET_ERROR; \ + } \ + } + #ifdef __cplusplus } #endif diff --git a/rcl/include/rcl/subscription.h b/rcl/include/rcl/subscription.h index 452cb694b..3e08f42ae 100644 --- a/rcl/include/rcl/subscription.h +++ b/rcl/include/rcl/subscription.h @@ -27,6 +27,7 @@ extern "C" #include "rcl/macros.h" #include "rcl/node.h" #include "rcl/visibility_control.h" +#include "rcutils/types/string_array.h" #include "rmw/message_sequence.h" @@ -208,6 +209,110 @@ RCL_WARN_UNUSED rcl_subscription_options_t rcl_subscription_get_default_options(void); +/// Reclaim resources held inside rcl_subscription_options_t structure. +/** + *
+ * Attribute | Adherence + * ------------------ | ------------- + * Allocates Memory | No + * Thread-Safe | No + * Uses Atomics | No + * Lock-Free | Yes + * + * \param[in] option The structure which its resources have to be deallocated. + * \return `RCL_RET_OK` if the memory was successfully freed, or + * \return `RCL_RET_INVALID_ARGUMENT` if option is NULL, or + * if its allocator is invalid and the structure contains initialized memory. + */ +RCL_PUBLIC +RCL_WARN_UNUSED +rcl_ret_t +rcl_subscription_options_fini(rcl_subscription_options_t * option); + +/// Check if the content filtered topic feature is supported in the subscription. +/** + * Depending on the middleware and whether cft is supported in the subscription. + * this will return true if the middleware can support ContentFilteredTopic in the subscription. + */ +RCL_PUBLIC +RCL_WARN_UNUSED +bool +rcl_subscription_is_cft_supported(const rcl_subscription_t * subscription); + +/// Set the filter expression and expression parameters for the subscription. +/** + * This function will set a filter expression and an array of expression parameters + * for the given subscription, but not to update the original rcl_subscription_options_t + * of subscription. + * + *
+ * Attribute | Adherence + * ------------------ | ------------- + * Allocates Memory | No + * Thread-Safe | No + * Uses Atomics | Maybe [1] + * Lock-Free | Maybe [1] + * + * \param[in] subscription the subscription object to inspect. + * \param[in] filter_expression A filter_expression is a string that specifies the criteria + * to select the data samples of interest. It is similar to the WHERE part of an SQL clause. + * Using an empty("") string can reset/clean content filtered topic for the subscription. + * \param[in] expression_parameters An expression_parameters is an array of strings that + * give values to the ‘parameters’ (i.e., "%n" tokens begin from 0) in the filter_expression. + * The number of supplied parameters must fit with the requested values in the filter_expression. + * It can be NULL if there is no "%n" tokens placeholder in filter_expression. + * The maximun size allowance depends on concrete DDS vendor. + * (i.e., it cannot be greater than 100 on RTI_Connext.) + * \return `RCL_RET_OK` if the query was successful, or + * \return `RCL_RET_INVALID_ARGUMENT` if `subscription` is NULL, or + * \return `RCL_RET_INVALID_ARGUMENT` if `filter_expression` is NULL, or + * \return `RCL_RET_UNSUPPORTED` if the implementation does not support content filter topic, or + * \return `RCL_RET_ERROR` if an unspecified error occurs. + */ +RCL_PUBLIC +RCL_WARN_UNUSED +rcl_ret_t +rcl_subscription_set_cft_expression_parameters( + const rcl_subscription_t * subscription, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters +); + +/// Retrieve the filter expression of the subscription. +/** + * This function will return an filter expression by the given subscription. + * + *
+ * Attribute | Adherence + * ------------------ | ------------- + * Allocates Memory | Yes + * Thread-Safe | No + * Uses Atomics | Maybe [1] + * Lock-Free | Maybe [1] + * + * \param[in] subscription the subscription object to inspect. + * \param[out] filter_expression an filter expression, populated on success. + * It is up to the caller to deallocate the filter expression later on, + * using rcutils_get_default_allocator().deallocate(). + * \param[out] expression_parameters Array of expression parameters, populated on success. + * It is up to the caller to finalize this array later on, using rcutils_string_array_fini(). + * \return `RCL_RET_OK` if the query was successful, or + * \return `RCL_RET_INVALID_ARGUMENT` if `subscription` is NULL, or + * \return `RCL_RET_INVALID_ARGUMENT` if `filter_expression` is NULL, or + * \return `RCL_RET_INVALID_ARGUMENT` if `expression_parameters` is NULL, or + * \return `RCL_RET_BAD_ALLOC` if memory allocation fails, or + * \return `RCL_RET_UNSUPPORTED` if the implementation does not support content filter topic, or + * \return `RCL_RET_ERROR` if an unspecified error occurs. + */ +RCL_PUBLIC +RCL_WARN_UNUSED +rcl_ret_t +rcl_subscription_get_cft_expression_parameters( + const rcl_subscription_t * subscription, + char ** filter_expression, + rcutils_string_array_t * expression_parameters +); + /// Take a ROS message from a topic using a rcl subscription. /** * It is the job of the caller to ensure that the type of the ros_message diff --git a/rcl/src/rcl/logging_rosout.c b/rcl/src/rcl/logging_rosout.c index 4133a52f6..c1803f72a 100644 --- a/rcl/src/rcl/logging_rosout.c +++ b/rcl/src/rcl/logging_rosout.c @@ -15,6 +15,7 @@ #include "rcl/allocator.h" #include "rcl/error_handling.h" #include "rcl/logging_rosout.h" +#include "rcl/macros.h" #include "rcl/node.h" #include "rcl/publisher.h" #include "rcl/time.h" @@ -43,37 +44,6 @@ extern "C" return RCL_RET_OK; \ } -#define RCL_RET_FROM_RCUTIL_RET(rcl_ret_var, rcutils_expr) \ - { \ - rcutils_ret_t rcutils_ret = rcutils_expr; \ - if (RCUTILS_RET_OK != rcutils_ret) { \ - if (rcutils_error_is_set()) { \ - RCL_SET_ERROR_MSG(rcutils_get_error_string().str); \ - } else { \ - RCL_SET_ERROR_MSG_WITH_FORMAT_STRING("rcutils_ret_t code: %i", rcutils_ret); \ - } \ - } \ - switch (rcutils_ret) { \ - case RCUTILS_RET_OK: \ - rcl_ret_var = RCL_RET_OK; \ - break; \ - case RCUTILS_RET_ERROR: \ - rcl_ret_var = RCL_RET_ERROR; \ - break; \ - case RCUTILS_RET_BAD_ALLOC: \ - rcl_ret_var = RCL_RET_BAD_ALLOC; \ - break; \ - case RCUTILS_RET_INVALID_ARGUMENT: \ - rcl_ret_var = RCL_RET_INVALID_ARGUMENT; \ - break; \ - case RCUTILS_RET_NOT_INITIALIZED: \ - rcl_ret_var = RCL_RET_NOT_INIT; \ - break; \ - default: \ - rcl_ret_var = RCUTILS_RET_ERROR; \ - } \ - } - typedef struct rosout_map_entry_t { rcl_node_t * node; diff --git a/rcl/src/rcl/subscription.c b/rcl/src/rcl/subscription.c index 611994356..dd459b060 100644 --- a/rcl/src/rcl/subscription.c +++ b/rcl/src/rcl/subscription.c @@ -24,6 +24,8 @@ extern "C" #include "rcl/error_handling.h" #include "rcl/node.h" #include "rcutils/logging_macros.h" +#include "rcutils/strdup.h" +#include "rcutils/types/string_array.h" #include "rmw/error_handling.h" #include "rmw/validate_full_topic_name.h" #include "tracetools/tracetools.h" @@ -92,6 +94,7 @@ rcl_subscription_init( sizeof(rcl_subscription_impl_t), allocator->state); RCL_CHECK_FOR_NULL_WITH_MSG( subscription->impl, "allocating memory failed", ret = RCL_RET_BAD_ALLOC; goto cleanup); + subscription->impl->options = rcl_subscription_get_default_options(); // Fill out the implemenation struct. // rmw_handle // TODO(wjwwood): pass allocator once supported in rmw api. @@ -115,8 +118,8 @@ rcl_subscription_init( } subscription->impl->actual_qos.avoid_ros_namespace_conventions = options->qos.avoid_ros_namespace_conventions; - // options subscription->impl->options = *options; + RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription initialized"); ret = RCL_RET_OK; TRACEPOINT( @@ -138,6 +141,12 @@ rcl_subscription_init( } } + ret = rcl_subscription_options_fini(&subscription->impl->options); + if (RCL_RET_OK != ret) { + RCUTILS_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str); + RCUTILS_SAFE_FWRITE_TO_STDERR("\n"); + } + allocator->deallocate(subscription->impl, allocator->state); subscription->impl = NULL; } @@ -174,6 +183,13 @@ rcl_subscription_fini(rcl_subscription_t * subscription, rcl_node_t * node) RCL_SET_ERROR_MSG(rmw_get_error_string().str); result = RCL_RET_ERROR; } + rcl_ret_t rcl_ret = rcl_subscription_options_fini(&subscription->impl->options); + if (RCL_RET_OK != rcl_ret) { + RCUTILS_SAFE_FWRITE_TO_STDERR(rcl_get_error_string().str); + RCUTILS_SAFE_FWRITE_TO_STDERR("\n"); + result = RCL_RET_ERROR; + } + allocator.deallocate(subscription->impl, allocator.state); subscription->impl = NULL; } @@ -193,6 +209,90 @@ rcl_subscription_get_default_options() return default_options; } +RCL_PUBLIC +RCL_WARN_UNUSED +rcl_ret_t +rcl_subscription_options_fini(rcl_subscription_options_t * option) +{ + RCL_CHECK_ARGUMENT_FOR_NULL(option, RCL_RET_INVALID_ARGUMENT); + // fini rmw_subscription_options_t + const rcl_allocator_t * allocator = &option->allocator; + RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT); + if (option->rmw_subscription_options.filter_expression) { + allocator->deallocate(option->rmw_subscription_options.filter_expression, allocator->state); + option->rmw_subscription_options.filter_expression = NULL; + } + + if (option->rmw_subscription_options.expression_parameters) { + rcutils_ret_t ret = rcutils_string_array_fini( + option->rmw_subscription_options.expression_parameters); + if (RCUTILS_RET_OK != ret) { + RCUTILS_SAFE_FWRITE_TO_STDERR("Failed to fini string array.\n"); + } + allocator->deallocate(option->rmw_subscription_options.expression_parameters, allocator->state); + option->rmw_subscription_options.expression_parameters = NULL; + } + return RCL_RET_OK; +} + +bool +rcl_subscription_is_cft_supported(const rcl_subscription_t * subscription) +{ + if (!rcl_subscription_is_valid(subscription)) { + return false; + } + return subscription->impl->rmw_handle->is_cft_supported; +} + +rcl_ret_t +rcl_subscription_set_cft_expression_parameters( + const rcl_subscription_t * subscription, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters +) +{ + RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID); + RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT); + + if (!rcl_subscription_is_valid(subscription)) { + return RCL_RET_SUBSCRIPTION_INVALID; + } + RCL_CHECK_ARGUMENT_FOR_NULL(filter_expression, RCL_RET_INVALID_ARGUMENT); + rmw_ret_t ret = rmw_subscription_set_cft_expression_parameters( + subscription->impl->rmw_handle, filter_expression, expression_parameters); + + if (ret != RMW_RET_OK) { + RCL_SET_ERROR_MSG(rmw_get_error_string().str); + return rcl_convert_rmw_ret_to_rcl_ret(ret); + } + return RCL_RET_OK; +} + +rcl_ret_t +rcl_subscription_get_cft_expression_parameters( + const rcl_subscription_t * subscription, + char ** filter_expression, + rcutils_string_array_t * expression_parameters +) +{ + RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID); + RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT); + + if (!rcl_subscription_is_valid(subscription)) { + return RCL_RET_SUBSCRIPTION_INVALID; + } + RCL_CHECK_ARGUMENT_FOR_NULL(filter_expression, RCL_RET_INVALID_ARGUMENT); + RCL_CHECK_ARGUMENT_FOR_NULL(expression_parameters, RCL_RET_INVALID_ARGUMENT); + rmw_ret_t ret = rmw_subscription_get_cft_expression_parameters( + subscription->impl->rmw_handle, filter_expression, expression_parameters); + + if (ret != RMW_RET_OK) { + RCL_SET_ERROR_MSG(rmw_get_error_string().str); + return rcl_convert_rmw_ret_to_rcl_ret(ret); + } + return RCL_RET_OK; +} + rcl_ret_t rcl_take( const rcl_subscription_t * subscription, diff --git a/rcl/test/rcl/test_subscription.cpp b/rcl/test/rcl/test_subscription.cpp index adedd7bb9..c5e64d383 100644 --- a/rcl/test/rcl/test_subscription.cpp +++ b/rcl/test/rcl/test_subscription.cpp @@ -23,6 +23,7 @@ #include "rmw/rmw.h" #include "rmw/validate_full_topic_name.h" +#include "rcutils/strdup.h" #include "rcutils/testing/fault_injection.h" #include "test_msgs/msg/basic_types.h" #include "test_msgs/msg/strings.h" @@ -847,6 +848,388 @@ TEST_F(CLASSNAME(TestSubscriptionFixture, RMW_IMPLEMENTATION), test_bad_return_l rcl_subscription_fini(&subscription, this->node_ptr)) << rcl_get_error_string().str; } + +/* A subscription with a content filtered topic setting. + */ +TEST_F( + CLASSNAME( + TestSubscriptionFixture, + RMW_IMPLEMENTATION), test_subscription_content_filtered) { + bool is_vendor_support_cft = + (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0); + + rcl_ret_t ret; + rcl_publisher_t publisher = rcl_get_zero_initialized_publisher(); + const rosidl_message_type_support_t * ts = + ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, Strings); + constexpr char topic[] = "rcl_test_subscription_content_filtered_chatter"; + rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options(); + ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic, &publisher_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + }); + rcl_subscription_t subscription = rcl_get_zero_initialized_subscription(); + rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options(); + subscription_options.rmw_subscription_options.filter_expression = + rcutils_strdup("string_value MATCH 'FilteredData'", subscription_options.allocator); + ret = rcl_subscription_init(&subscription, this->node_ptr, ts, topic, &subscription_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_subscription_fini(&subscription, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + }); + if (is_vendor_support_cft) { + ASSERT_TRUE(rcl_subscription_is_cft_supported(&subscription)); + } else { + ASSERT_FALSE(rcl_subscription_is_cft_supported(&subscription)); + } + ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 100)); + constexpr char test_string[] = "NotFilteredData"; + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + if (is_vendor_support_cft) { + ASSERT_FALSE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + } else { + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string(test_string), + std::string(msg.string_value.data, msg.string_value.size)); + } + + constexpr char test_filtered_string[] = "FilteredData"; + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_filtered_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string(test_filtered_string), + std::string(msg.string_value.data, msg.string_value.size)); + } + + // set filter + { + rcutils_string_array_t expression_parameters; + expression_parameters = rcutils_get_zero_initialized_string_array(); + rcl_allocator_t allocator = rcl_get_default_allocator(); + rcutils_ret_t rcutils_ret = rcutils_string_array_init( + &expression_parameters, 1, &allocator); + ASSERT_EQ(RCUTILS_RET_OK, rcutils_ret) << rcl_get_error_string().str; + expression_parameters.data[0] = rcutils_strdup("'FilteredOtherData'", allocator); + + ret = rcl_subscription_set_cft_expression_parameters( + &subscription, "string_value MATCH %0", &expression_parameters); + if (is_vendor_support_cft) { + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } else { + ASSERT_EQ(RCL_RET_UNSUPPORTED, ret); + } + + rcutils_ret = rcutils_string_array_fini(&expression_parameters); + ASSERT_EQ(RCUTILS_RET_OK, rcutils_ret) << rcl_get_error_string().str; + } + + // publish FilteredData again + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_filtered_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + if (is_vendor_support_cft) { + ASSERT_FALSE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + } else { + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string(test_filtered_string), + std::string(msg.string_value.data, msg.string_value.size)); + } + + constexpr char test_filtered_other_string[] = "FilteredOtherData"; + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_filtered_other_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string(test_filtered_other_string), + std::string(msg.string_value.data, msg.string_value.size)); + } + + // get filter + char * filter_expression = nullptr; + rcutils_string_array_t expression_parameters; + expression_parameters = rcutils_get_zero_initialized_string_array(); + ret = rcl_subscription_get_cft_expression_parameters( + &subscription, &filter_expression, &expression_parameters); + if (is_vendor_support_cft) { + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string("string_value MATCH %0"), + std::string(filter_expression)); + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + allocator.deallocate(filter_expression, allocator.state); + ASSERT_EQ(static_cast(1), expression_parameters.size); + ASSERT_EQ( + std::string("'FilteredOtherData'"), + std::string(expression_parameters.data[0])); + ASSERT_EQ(RCUTILS_RET_OK, rcutils_string_array_fini(&expression_parameters)); + } else { + ASSERT_EQ(RCL_RET_UNSUPPORTED, ret); + } + + // reset filter + { + ret = rcl_subscription_set_cft_expression_parameters( + &subscription, "", NULL); + if (is_vendor_support_cft) { + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_TRUE(wait_for_established_subscription(&publisher, 50, 100)); + ASSERT_FALSE(rcl_subscription_is_cft_supported(&subscription)); + } else { + ASSERT_EQ(RCL_RET_UNSUPPORTED, ret); + } + } + + // publish no filtered data again + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + + // it might take 'FilteredData' again, if so, contine to take data + std::string data = std::string(msg.string_value.data, msg.string_value.size); + if (is_vendor_support_cft) { + const int try_total_num = 3; + int i = 0; + while (data != test_string && i++ < try_total_num) { + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + data = std::string(msg.string_value.data, msg.string_value.size); + } + } + ASSERT_EQ(std::string(test_string), data); + } +} + +/* A subscription without a content filtered topic setting at beginning. + */ +TEST_F( + CLASSNAME( + TestSubscriptionFixture, RMW_IMPLEMENTATION), test_subscription_not_begin_content_filtered) { + bool is_vendor_support_cft = + (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0); + + rcl_ret_t ret; + rcl_publisher_t publisher = rcl_get_zero_initialized_publisher(); + const rosidl_message_type_support_t * ts = + ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, Strings); + constexpr char topic[] = "rcl_test_subscription_not_begin_content_filtered_chatter"; + rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options(); + ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic, &publisher_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + }); + rcl_subscription_t subscription = rcl_get_zero_initialized_subscription(); + // not to set filter expression + rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options(); + ret = rcl_subscription_init(&subscription, this->node_ptr, ts, topic, &subscription_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_subscription_fini(&subscription, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + }); + ASSERT_FALSE(rcl_subscription_is_cft_supported(&subscription)); + ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 100)); + + // publish no filtered data + constexpr char test_string[] = "NotFilteredData"; + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string(test_string), + std::string(msg.string_value.data, msg.string_value.size)); + } + + // set filter + { + ret = rcl_subscription_set_cft_expression_parameters( + &subscription, "string_value MATCH 'FilteredData'", NULL); + if (is_vendor_support_cft) { + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } else { + ASSERT_EQ(RCL_RET_UNSUPPORTED, ret); + } + } + + if (is_vendor_support_cft) { + ASSERT_TRUE(rcl_subscription_is_cft_supported(&subscription)); + } else { + ASSERT_FALSE(rcl_subscription_is_cft_supported(&subscription)); + } + // publish no filtered data again + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + if (is_vendor_support_cft) { + ASSERT_FALSE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + } else { + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string(test_string), + std::string(msg.string_value.data, msg.string_value.size)); + } + + // publish filtered data + constexpr char test_filtered_string[] = "FilteredData"; + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_filtered_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100)); + + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__Strings__fini(&msg); + }); + ret = rcl_take(&subscription, &msg, nullptr, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ( + std::string(test_filtered_string), + std::string(msg.string_value.data, msg.string_value.size)); + } +} + TEST_F(CLASSNAME(TestSubscriptionFixture, RMW_IMPLEMENTATION), test_get_options) { rcl_ret_t ret; const rosidl_message_type_support_t * ts = @@ -1087,6 +1470,8 @@ TEST_F(CLASSNAME(TestSubscriptionFixtureInit, RMW_IMPLEMENTATION), test_subscrip rcl_reset_error(); EXPECT_EQ(NULL, rcl_subscription_get_options(nullptr)); rcl_reset_error(); + EXPECT_FALSE(rcl_subscription_is_cft_supported(nullptr)); + rcl_reset_error(); EXPECT_EQ(NULL, rcl_subscription_get_actual_qos(&subscription_zero_init)); rcl_reset_error(); @@ -1098,6 +1483,111 @@ TEST_F(CLASSNAME(TestSubscriptionFixtureInit, RMW_IMPLEMENTATION), test_subscrip rcl_reset_error(); EXPECT_EQ(NULL, rcl_subscription_get_options(&subscription_zero_init)); rcl_reset_error(); + EXPECT_FALSE(rcl_subscription_is_cft_supported(&subscription_zero_init)); + rcl_reset_error(); +} + +/* Test for all failure modes in rcl_subscription_set_cft_expression_parameters function. + */ +TEST_F( + CLASSNAME( + TestSubscriptionFixtureInit, + RMW_IMPLEMENTATION), test_bad_rcl_subscription_set_cft_expression_parameters) { + EXPECT_EQ( + RCL_RET_SUBSCRIPTION_INVALID, + rcl_subscription_set_cft_expression_parameters(nullptr, nullptr, nullptr)); + rcl_reset_error(); + + EXPECT_EQ( + RCL_RET_SUBSCRIPTION_INVALID, + rcl_subscription_set_cft_expression_parameters(&subscription_zero_init, nullptr, nullptr)); + rcl_reset_error(); + + EXPECT_EQ( + RCL_RET_INVALID_ARGUMENT, + rcl_subscription_set_cft_expression_parameters(&subscription, nullptr, nullptr)); + rcl_reset_error(); + + { + auto mock = mocking_utils::patch_and_return( + "lib:rcl", rmw_subscription_set_cft_expression_parameters, RMW_RET_UNSUPPORTED); + std::string filter_expression = "data MATCH '0'"; + EXPECT_EQ( + RMW_RET_UNSUPPORTED, + rcl_subscription_set_cft_expression_parameters( + &subscription, filter_expression.c_str(), + nullptr)); + rcl_reset_error(); + } + + { + auto mock = mocking_utils::patch_and_return( + "lib:rcl", rmw_subscription_set_cft_expression_parameters, RMW_RET_ERROR); + std::string filter_expression = "data MATCH '0'"; + EXPECT_EQ( + RCL_RET_ERROR, + rcl_subscription_set_cft_expression_parameters( + &subscription, filter_expression.c_str(), + nullptr)); + rcl_reset_error(); + } +} + +/* Test for all failure modes in rcl_subscription_get_cft_expression_parameters function. + */ +TEST_F( + CLASSNAME( + TestSubscriptionFixtureInit, + RMW_IMPLEMENTATION), test_bad_rcl_subscription_get_cft_expression_parameters) { + EXPECT_EQ( + RCL_RET_SUBSCRIPTION_INVALID, + rcl_subscription_get_cft_expression_parameters(nullptr, nullptr, nullptr)); + rcl_reset_error(); + + EXPECT_EQ( + RCL_RET_SUBSCRIPTION_INVALID, + rcl_subscription_get_cft_expression_parameters(&subscription_zero_init, nullptr, nullptr)); + rcl_reset_error(); + + EXPECT_EQ( + RCL_RET_INVALID_ARGUMENT, + rcl_subscription_get_cft_expression_parameters(&subscription, nullptr, nullptr)); + rcl_reset_error(); + + char * filter_expression = NULL; + rcutils_string_array_t parameters; + + EXPECT_EQ( + RCL_RET_INVALID_ARGUMENT, + rcl_subscription_get_cft_expression_parameters(&subscription, &filter_expression, nullptr)); + rcl_reset_error(); + + EXPECT_EQ( + RCL_RET_INVALID_ARGUMENT, + rcl_subscription_get_cft_expression_parameters(&subscription, nullptr, ¶meters)); + rcl_reset_error(); + + { + auto mock = mocking_utils::patch_and_return( + "lib:rcl", rmw_subscription_get_cft_expression_parameters, RMW_RET_UNSUPPORTED); + EXPECT_EQ( + RMW_RET_UNSUPPORTED, + rcl_subscription_get_cft_expression_parameters( + &subscription, &filter_expression, + ¶meters)); + rcl_reset_error(); + } + + { + auto mock = mocking_utils::patch_and_return( + "lib:rcl", rmw_subscription_get_cft_expression_parameters, RMW_RET_ERROR); + EXPECT_EQ( + RCL_RET_ERROR, + rcl_subscription_get_cft_expression_parameters( + &subscription, &filter_expression, + ¶meters)); + rcl_reset_error(); + } } TEST_F(CLASSNAME(TestSubscriptionFixture, RMW_IMPLEMENTATION), test_init_fini_maybe_fail) diff --git a/rcl_action/include/rcl_action/action_client.h b/rcl_action/include/rcl_action/action_client.h index 430d05e66..810d9994f 100644 --- a/rcl_action/include/rcl_action/action_client.h +++ b/rcl_action/include/rcl_action/action_client.h @@ -24,6 +24,7 @@ extern "C" #include "rcl_action/visibility_control.h" #include "rcl/macros.h" #include "rcl/node.h" +#include "rcl/subscription.h" /// Internal action client implementation struct. @@ -741,6 +742,64 @@ bool rcl_action_client_is_valid( const rcl_action_client_t * action_client); +/// Add a goal uuid. +/** + * This function is to add a goal uuid to the map of rcl_action_client_t + * and then try to set content filtered topic if it is supported. + * + * The caller must provide a lock to call this interface + * + *
+ * Attribute | Adherence + * ------------------ | ------------- + * Allocates Memory | No + * Thread-Safe | No + * Uses Atomics | No + * Lock-Free | Yes + * + * \param[in] action_client handle to the client that will take the goal response + * \param[in] uuid pointer to a uuid which length is 16 + * \return `RCL_RET_OK` if success on setting a goal uuid, or + * \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or + * \return `RCL_RET_ACTION_CLIENT_INVALID` if the action client is invalid, or + * \return `RCL_RET_UNSUPPORTED` if setting content filtered topic is not supported + * in the middleware, or + * \return `RCL_RET_ERROR` if an unspecified error occurs. + */ +RCL_ACTION_PUBLIC +rcl_ret_t rcl_action_add_goal_uuid( + const rcl_action_client_t * action_client, + const uint8_t * uuid); + +/// Remove a goal uuid. +/** + * This function is to remove a goal uuid from the map of rcl_action_client_t + * and then try to reset content filtered topic if it is supported. + * + * The caller must provide a lock to call this interface + * + *
+ * Attribute | Adherence + * ------------------ | ------------- + * Allocates Memory | No + * Thread-Safe | No + * Uses Atomics | No + * Lock-Free | Yes + * + * \param[in] action_client handle to the client that will take the goal response + * \param[in] uuid pointer to a uuid which length is 16 + * \return `RCL_RET_OK` if success on removing a goal uuid, or + * \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or + * \return `RCL_RET_ACTION_CLIENT_INVALID` if the action client is invalid, or + * \return `RCL_RET_UNSUPPORTED` if setting content filtered topic is not supported + * in the middleware, or + * \return `RCL_RET_ERROR` if an unspecified error occurs. + */ +RCL_ACTION_PUBLIC +rcl_ret_t rcl_action_remove_goal_uuid( + const rcl_action_client_t * action_client, + const uint8_t * uuid); + #ifdef __cplusplus } #endif diff --git a/rcl_action/include/rcl_action/types.h b/rcl_action/include/rcl_action/types.h index d5f60e446..8aa108583 100644 --- a/rcl_action/include/rcl_action/types.h +++ b/rcl_action/include/rcl_action/types.h @@ -58,6 +58,25 @@ extern "C" #define uuidcmp(uuid0, uuid1) (0 == memcmp(uuid0, uuid1, UUID_SIZE)) #define zerouuid (uint8_t[UUID_SIZE]) {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} #define uuidcmpzero(uuid) uuidcmp(uuid, (zerouuid)) +#define UUID_FMT \ + "%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X" +#define UUID_FMT_ARGS(uuid) \ + uuid[0], \ + uuid[1], \ + uuid[2], \ + uuid[3], \ + uuid[4], \ + uuid[5], \ + uuid[6], \ + uuid[7], \ + uuid[8], \ + uuid[9], \ + uuid[10], \ + uuid[11], \ + uuid[12], \ + uuid[13], \ + uuid[14], \ + uuid[15] // Forward declare typedef struct rcl_action_server_t rcl_action_server_t; diff --git a/rcl_action/src/rcl_action/action_client.c b/rcl_action/src/rcl_action/action_client.c index a03a61ec3..0fe4d329a 100644 --- a/rcl_action/src/rcl_action/action_client.c +++ b/rcl_action/src/rcl_action/action_client.c @@ -33,6 +33,7 @@ extern "C" #include "rcl/wait.h" #include "rcutils/logging_macros.h" +#include "rcutils/format_string.h" #include "rcutils/strdup.h" #include "rmw/qos_profiles.h" @@ -63,7 +64,8 @@ _rcl_action_get_zero_initialized_client_impl(void) 0, 0, 0, - 0 + 0, + rcutils_get_zero_initialized_hash_map() }; return null_action_client; } @@ -92,6 +94,30 @@ _rcl_action_client_fini_impl( ret = RCL_RET_ERROR; } allocator.deallocate(action_client->impl->action_name, allocator.state); + if (NULL != action_client->impl->goal_uuids.impl) { + uint8_t uuid[UUID_SIZE]; + char * value = NULL; + rcl_allocator_t default_allocator = rcl_get_default_allocator(); + rcutils_ret_t hashmap_ret = rcutils_hash_map_get_next_key_and_data( + &action_client->impl->goal_uuids, NULL, uuid, &value); + while (RCUTILS_RET_OK == hashmap_ret) { + RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "remove a uuid: %s", value); + default_allocator.deallocate(value, default_allocator.state); + RCL_RET_FROM_RCUTIL_RET( + ret, rcutils_hash_map_unset(&action_client->impl->goal_uuids, uuid)); + if (ret == RCL_RET_OK) { + hashmap_ret = rcutils_hash_map_get_next_key_and_data( + &action_client->impl->goal_uuids, NULL, uuid, &value); + } + } + if (RCUTILS_RET_HASH_MAP_NO_MORE_ENTRIES != hashmap_ret) { + RCL_RET_FROM_RCUTIL_RET(ret, hashmap_ret); + } + if (RCL_RET_OK == ret) { + RCL_RET_FROM_RCUTIL_RET(ret, rcutils_hash_map_fini(&action_client->impl->goal_uuids)); + } + } + allocator.deallocate(action_client->impl, allocator.state); action_client->impl = NULL; RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Action client finalized"); @@ -171,6 +197,26 @@ _rcl_action_client_fini_impl( goto fail; \ } +size_t hash_map_uuid_hash_func(const void * uuid) +{ + const uint8_t * ckey_str = (const uint8_t *) uuid; + size_t hash = 5381; + + for (size_t i = 0; i < UUID_SIZE; ++i) { + const char c = *(ckey_str++); + hash = ((hash << 5) + hash) + (size_t)c; /* hash * 33 + c */ + } + + return hash; +} + +int hash_map_uuid_cmp_func(const void * val1, const void * val2) +{ + const uint8_t * cval1 = (const uint8_t *)val1; + const uint8_t * cval2 = (const uint8_t *)val2; + return memcmp(cval1, cval2, UUID_SIZE); +} + rcl_ret_t rcl_action_client_init( rcl_action_client_t * action_client, @@ -222,6 +268,15 @@ rcl_action_client_init( SUBSCRIPTION_INIT(feedback); SUBSCRIPTION_INIT(status); + // Initialize goal_uuids map + RCL_RET_FROM_RCUTIL_RET( + ret, rcutils_hash_map_init( + &action_client->impl->goal_uuids, 2, sizeof(uint8_t[16]), sizeof(const char **), + hash_map_uuid_hash_func, hash_map_uuid_cmp_func, &allocator)); + if (RCL_RET_OK != ret) { + goto fail; + } + RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Action client initialized"); return ret; fail: @@ -649,6 +704,210 @@ rcl_action_client_wait_set_get_entities_ready( return RCL_RET_OK; } +static char * +to_uuid_string(const uint8_t * uuid, rcl_allocator_t allocator) +{ + char * uuid_str = rcutils_format_string(allocator, UUID_FMT, UUID_FMT_ARGS(uuid)); + return uuid_str; +} + +static +rcl_ret_t set_content_filtered_topic( + const rcl_action_client_t * action_client) +{ + rcl_ret_t ret; + rcutils_ret_t rcutils_ret; + uint8_t uuid[UUID_SIZE]; + char * uuid_str = NULL; + size_t size; + char * feedback_filter = NULL; + char * feedback_filter_update = NULL; + rcl_allocator_t allocator = rcl_get_default_allocator(); + + feedback_filter = rcutils_strdup("", allocator); + if (NULL == feedback_filter) { + RCL_SET_ERROR_MSG("failed to allocate memory for feedback filter string"); + ret = RCL_RET_BAD_ALLOC; + goto clean; + } + + RCL_RET_FROM_RCUTIL_RET( + ret, rcutils_hash_map_get_size(&action_client->impl->goal_uuids, &size)); + if (RCL_RET_OK != ret) { + goto clean; + } + RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "size: %zu", size); + if (0 == size) { + // mainly to reset content filtered topic with empty string + goto set_cft; + } + + RCL_RET_FROM_RCUTIL_RET( + ret, rcutils_hash_map_get_next_key_and_data( + &action_client->impl->goal_uuids, NULL, uuid, &uuid_str)); + if (RCL_RET_OK != ret) { + goto clean; + } + + feedback_filter_update = + rcutils_format_string(allocator, "goal_id.uuid = &hex(%s)", uuid_str); + if (NULL == feedback_filter_update) { + RCL_SET_ERROR_MSG("failed to format string for feedback filter"); + ret = RCL_RET_BAD_ALLOC; + goto clean; + } + allocator.deallocate(feedback_filter, allocator.state); + feedback_filter = feedback_filter_update; + + while (RCUTILS_RET_OK == (rcutils_ret = rcutils_hash_map_get_next_key_and_data( + &action_client->impl->goal_uuids, uuid, uuid, &uuid_str))) + { + feedback_filter_update = rcutils_format_string( + allocator, "%s or goal_id.uuid = &hex(%s)", feedback_filter, uuid_str); + if (NULL == feedback_filter_update) { + RCL_SET_ERROR_MSG("failed to format string for feedback filter"); + ret = RCL_RET_BAD_ALLOC; + goto clean; + } + allocator.deallocate(feedback_filter, allocator.state); + feedback_filter = feedback_filter_update; + } + if (RCUTILS_RET_HASH_MAP_NO_MORE_ENTRIES != rcutils_ret) { + RCL_RET_FROM_RCUTIL_RET(ret, rcutils_ret); + } + if (RCL_RET_OK != ret) { + goto clean; + } + +set_cft: + + RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "feedback_filter: %s", feedback_filter); + ret = rcl_subscription_set_cft_expression_parameters( + &action_client->impl->feedback_subscription, feedback_filter, NULL); + if (RCL_RET_OK != ret) { + goto clean; + } + + // TODO(iuhilnehc-ynos) set status_subscription with filter expression for the sequence + // by wildcard if rti_connext_dds support this feature in the future, + // otherwise, it seems it's necessary to update the action message type or something else. + +clean: + + allocator.deallocate(feedback_filter, allocator.state); + return ret; +} + +rcl_ret_t rcl_action_add_goal_uuid( + const rcl_action_client_t * action_client, + const uint8_t * uuid) +{ + if (!rcl_action_client_is_valid(action_client)) { + return RCL_RET_ACTION_CLIENT_INVALID; /* error already set */ + } + RCL_CHECK_ARGUMENT_FOR_NULL(uuid, RCL_RET_INVALID_ARGUMENT); + + rcl_ret_t ret; + char * uuid_str = NULL; + rcl_allocator_t allocator = rcl_get_default_allocator(); + uuid_str = to_uuid_string(uuid, allocator); + if (NULL == uuid_str) { + RCL_SET_ERROR_MSG("failed to allocate memory for uuid value"); + ret = RCL_RET_BAD_ALLOC; + goto end; + } + + RCL_RET_FROM_RCUTIL_RET( + ret, rcutils_hash_map_set(&action_client->impl->goal_uuids, uuid, &uuid_str)); + if (RCL_RET_OK != ret) { + goto clean; + } + + // to set content filtered topic + RCUTILS_LOG_DEBUG_NAMED( + ROS_PACKAGE_NAME, "set content filtered topic after adding a uuid: %s", uuid_str); + ret = set_content_filtered_topic(action_client); + if (RCL_RET_OK != ret) { + char * err = rcutils_strdup(rcl_get_error_string().str, allocator); + if (NULL == err) { + RCUTILS_LOG_ERROR_NAMED(ROS_PACKAGE_NAME, "failed to allocate memory for error"); + ret = RCL_RET_BAD_ALLOC; + goto clean; + } + rcl_reset_error(); + RCL_SET_ERROR_MSG_WITH_FORMAT_STRING("failed to set_content_filtered_topic: %s", err); + allocator.deallocate(err, allocator.state); + } + goto end; + +clean: + allocator.deallocate(uuid_str, allocator.state); + +end: + return ret; +} + +rcl_ret_t rcl_action_remove_goal_uuid( + const rcl_action_client_t * action_client, + const uint8_t * uuid) +{ + if (!rcl_action_client_is_valid(action_client)) { + return RCL_RET_ACTION_CLIENT_INVALID; /* error already set */ + } + RCL_CHECK_ARGUMENT_FOR_NULL(uuid, RCL_RET_INVALID_ARGUMENT); + + rcl_ret_t ret; + char * uuid_str = NULL; + char * uuid_value = NULL; + rcl_allocator_t allocator = rcl_get_default_allocator(); + uuid_str = to_uuid_string(uuid, allocator); + if (NULL == uuid_str) { + RCL_SET_ERROR_MSG("failed to allocate memory for uuid value"); + ret = RCL_RET_BAD_ALLOC; + goto end; + } + if (!rcutils_hash_map_key_exists(&action_client->impl->goal_uuids, uuid)) { + RCL_SET_ERROR_MSG_WITH_FORMAT_STRING( + "item key [%s] not found in the map of goal uuids", + uuid_str); + ret = RCL_RET_ERROR; + goto end; + } + + RCL_RET_FROM_RCUTIL_RET( + ret, rcutils_hash_map_get(&action_client->impl->goal_uuids, uuid, &uuid_value)); + if (RCL_RET_OK != ret) { + goto end; + } + allocator.deallocate(uuid_value, allocator.state); + + RCL_RET_FROM_RCUTIL_RET( + ret, rcutils_hash_map_unset(&action_client->impl->goal_uuids, uuid)); + if (RCL_RET_OK != ret) { + goto end; + } + + RCUTILS_LOG_DEBUG_NAMED( + ROS_PACKAGE_NAME, "set content filtered topic after removing a uuid: %s", uuid_str); + ret = set_content_filtered_topic(action_client); + if (RCL_RET_OK != ret) { + rcl_allocator_t allocator = rcl_get_default_allocator(); + char * err = rcutils_strdup(rcl_get_error_string().str, allocator); + if (NULL == err) { + RCUTILS_LOG_ERROR_NAMED(ROS_PACKAGE_NAME, "failed to allocate memory for error"); + ret = RCL_RET_BAD_ALLOC; + goto end; + } + rcl_reset_error(); + RCL_SET_ERROR_MSG_WITH_FORMAT_STRING("failed to set_content_filtered_topic: %s", err); + allocator.deallocate(err, allocator.state); + } + +end: + allocator.deallocate(uuid_str, allocator.state); + return ret; +} + #ifdef __cplusplus } #endif diff --git a/rcl_action/src/rcl_action/action_client_impl.h b/rcl_action/src/rcl_action/action_client_impl.h index 00cdd6125..e4bccdfea 100644 --- a/rcl_action/src/rcl_action/action_client_impl.h +++ b/rcl_action/src/rcl_action/action_client_impl.h @@ -18,6 +18,8 @@ #include "rcl_action/types.h" #include "rcl/rcl.h" +#include "rcutils/types.h" + typedef struct rcl_action_client_impl_t { rcl_client_t goal_client; @@ -33,6 +35,7 @@ typedef struct rcl_action_client_impl_t size_t wait_set_result_client_index; size_t wait_set_feedback_subscription_index; size_t wait_set_status_subscription_index; + rcutils_hash_map_t goal_uuids; } rcl_action_client_impl_t; diff --git a/rcl_action/test/rcl_action/test_action_communication.cpp b/rcl_action/test/rcl_action/test_action_communication.cpp index 7f59fc333..bad51bfe6 100644 --- a/rcl_action/test/rcl_action/test_action_communication.cpp +++ b/rcl_action/test/rcl_action/test_action_communication.cpp @@ -13,6 +13,8 @@ // limitations under the License. #include +#include + #include "osrf_testing_tools_cpp/scope_exit.hpp" #include "rcl_action/action_client.h" @@ -652,6 +654,105 @@ TEST_F(CLASSNAME(TestActionCommunication, RMW_IMPLEMENTATION), test_valid_feedba test_msgs__action__Fibonacci_FeedbackMessage__fini(&outgoing_feedback); } +TEST_F(CLASSNAME(TestActionCommunication, RMW_IMPLEMENTATION), test_valid_feedback_comm_cft) +{ + bool is_vendor_support_cft = + (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0); + + #define FEEDBACK_SIZE 2 + test_msgs__action__Fibonacci_FeedbackMessage outgoing_feedback[FEEDBACK_SIZE]; + test_msgs__action__Fibonacci_FeedbackMessage incoming_feedback[FEEDBACK_SIZE]; + for (size_t i = 0; i < FEEDBACK_SIZE; ++i) { + test_msgs__action__Fibonacci_FeedbackMessage__init(&outgoing_feedback[i]); + test_msgs__action__Fibonacci_FeedbackMessage__init(&incoming_feedback[i]); + } + + uint8_t uuid[UUID_SIZE]; + init_test_uuid0(uuid); + rcl_ret_t ret = rcl_action_add_goal_uuid(&this->action_client, uuid); + if (is_vendor_support_cft) { + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + } else { + EXPECT_EQ(ret, RCL_RET_UNSUPPORTED) << rcl_get_error_string().str; + } + + // Initialize feedback + for (size_t i = 0; i < FEEDBACK_SIZE; ++i) { + ASSERT_TRUE( + rosidl_runtime_c__int32__Sequence__init( + &outgoing_feedback[i].feedback.sequence, 3)); + outgoing_feedback[i].feedback.sequence.data[0] = 0; + outgoing_feedback[i].feedback.sequence.data[1] = 1; + outgoing_feedback[i].feedback.sequence.data[2] = 2; + i == 0 ? init_test_uuid0(outgoing_feedback[i].goal_id.uuid) : + init_test_uuid1(outgoing_feedback[i].goal_id.uuid); + } + + // Publish feedback with valid arguments + for (size_t i = 0; i < FEEDBACK_SIZE; ++i) { + ret = rcl_action_publish_feedback(&this->action_server, &outgoing_feedback[i]); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + + ret = rcl_wait_set_clear(&this->wait_set); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + + ret = rcl_action_wait_set_add_action_client( + &this->wait_set, &this->action_client, NULL, NULL); + ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + + ret = rcl_wait(&this->wait_set, RCL_S_TO_NS(10)); + if (0 == i || !is_vendor_support_cft) { + ASSERT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + ret = rcl_action_client_wait_set_get_entities_ready( + &this->wait_set, + &this->action_client, + &this->is_feedback_ready, + &this->is_status_ready, + &this->is_goal_response_ready, + &this->is_cancel_response_ready, + &this->is_result_response_ready); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + + EXPECT_TRUE(this->is_feedback_ready); + EXPECT_FALSE(this->is_status_ready); + EXPECT_FALSE(this->is_result_response_ready); + EXPECT_FALSE(this->is_cancel_response_ready); + EXPECT_FALSE(this->is_goal_response_ready); + + // Take feedback with valid arguments + ret = rcl_action_take_feedback(&this->action_client, &incoming_feedback[i]); + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + + // Check that feedback was received correctly + EXPECT_TRUE( + uuidcmp( + outgoing_feedback[i].goal_id.uuid, + incoming_feedback[i].goal_id.uuid)); + ASSERT_EQ( + outgoing_feedback[i].feedback.sequence.size, incoming_feedback[i].feedback.sequence.size); + EXPECT_TRUE( + !memcmp( + outgoing_feedback[i].feedback.sequence.data, + incoming_feedback[i].feedback.sequence.data, + outgoing_feedback[i].feedback.sequence.size)); + } else { + EXPECT_EQ(ret, RCL_RET_TIMEOUT) << rcl_get_error_string().str; + } + } + + ret = rcl_action_remove_goal_uuid(&this->action_client, uuid); + if (is_vendor_support_cft) { + EXPECT_EQ(ret, RCL_RET_OK) << rcl_get_error_string().str; + } else { + EXPECT_EQ(ret, RCL_RET_UNSUPPORTED) << rcl_get_error_string().str; + } + + for (size_t i = 0; i < FEEDBACK_SIZE; ++i) { + test_msgs__action__Fibonacci_FeedbackMessage__fini(&outgoing_feedback[i]); + test_msgs__action__Fibonacci_FeedbackMessage__fini(&incoming_feedback[i]); + } +} + TEST_F(CLASSNAME(TestActionCommunication, RMW_IMPLEMENTATION), test_invalid_goal_request_opts) { test_msgs__action__Fibonacci_SendGoal_Request outgoing_goal_request;