Skip to content

Commit

Permalink
to support a feature of content filtered topic
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Lihui <[email protected]>
  • Loading branch information
Chen Lihui authored and Chen Lihui committed Mar 24, 2021
1 parent 2987f9f commit 34d0802
Show file tree
Hide file tree
Showing 10 changed files with 1,170 additions and 33 deletions.
31 changes: 31 additions & 0 deletions rcl/include/rcl/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,37 @@ extern "C"

#define RCL_UNUSED(x) RCUTILS_UNUSED(x)

#define RCL_RET_FROM_RCUTIL_RET(rcl_ret_var, rcutils_expr) \
{ \
rcutils_ret_t rcutils_ret = rcutils_expr; \
if (RCUTILS_RET_OK != rcutils_ret) { \
if (rcutils_error_is_set()) { \
RCL_SET_ERROR_MSG(rcutils_get_error_string().str); \
} else { \
RCL_SET_ERROR_MSG_WITH_FORMAT_STRING("rcutils_ret_t code: %i", rcutils_ret); \
} \
} \
switch (rcutils_ret) { \
case RCUTILS_RET_OK: \
rcl_ret_var = RCL_RET_OK; \
break; \
case RCUTILS_RET_ERROR: \
rcl_ret_var = RCL_RET_ERROR; \
break; \
case RCUTILS_RET_BAD_ALLOC: \
rcl_ret_var = RCL_RET_BAD_ALLOC; \
break; \
case RCUTILS_RET_INVALID_ARGUMENT: \
rcl_ret_var = RCL_RET_INVALID_ARGUMENT; \
break; \
case RCUTILS_RET_NOT_INITIALIZED: \
rcl_ret_var = RCL_RET_NOT_INIT; \
break; \
default: \
rcl_ret_var = RCUTILS_RET_ERROR; \
} \
}

#ifdef __cplusplus
}
#endif
Expand Down
105 changes: 105 additions & 0 deletions rcl/include/rcl/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ extern "C"
#include "rcl/macros.h"
#include "rcl/node.h"
#include "rcl/visibility_control.h"
#include "rcutils/types/string_array.h"

#include "rmw/message_sequence.h"

Expand Down Expand Up @@ -208,6 +209,110 @@ RCL_WARN_UNUSED
rcl_subscription_options_t
rcl_subscription_get_default_options(void);

/// Reclaim resources held inside rcl_subscription_options_t structure.
/**
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | No
* Thread-Safe | No
* Uses Atomics | No
* Lock-Free | Yes
*
* \param[in] option The structure which its resources have to be deallocated.
* \return `RCL_RET_OK` if the memory was successfully freed, or
* \return `RCL_RET_INVALID_ARGUMENT` if option is NULL, or
* if its allocator is invalid and the structure contains initialized memory.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_subscription_options_fini(rcl_subscription_options_t * option);

/// Check if the content filtered topic feature is supported in the subscription.
/**
* Depending on the middleware and whether cft is supported in the subscription.
* this will return true if the middleware can support ContentFilteredTopic in the subscription.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
bool
rcl_subscription_is_cft_supported(const rcl_subscription_t * subscription);

/// Set the filter expression and expression parameters for the subscription.
/**
* This function will set a filter expression and an array of expression parameters
* for the given subscription, but not to update the original rcl_subscription_options_t
* of subscription.
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | No
* Thread-Safe | No
* Uses Atomics | Maybe [1]
* Lock-Free | Maybe [1]
*
* \param[in] subscription the subscription object to inspect.
* \param[in] filter_expression A filter_expression is a string that specifies the criteria
* to select the data samples of interest. It is similar to the WHERE part of an SQL clause.
* Using an empty("") string can reset/clean content filtered topic for the subscription.
* \param[in] expression_parameters An expression_parameters is an array of strings that
* give values to the ‘parameters’ (i.e., "%n" tokens begin from 0) in the filter_expression.
* The number of supplied parameters must fit with the requested values in the filter_expression.
* It can be NULL if there is no "%n" tokens placeholder in filter_expression.
* The maximun size allowance depends on concrete DDS vendor.
* (i.e., it cannot be greater than 100 on RTI_Connext.)
* \return `RCL_RET_OK` if the query was successful, or
* \return `RCL_RET_INVALID_ARGUMENT` if `subscription` is NULL, or
* \return `RCL_RET_INVALID_ARGUMENT` if `filter_expression` is NULL, or
* \return `RCL_RET_UNSUPPORTED` if the implementation does not support content filter topic, or
* \return `RCL_RET_ERROR` if an unspecified error occurs.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_subscription_set_cft_expression_parameters(
const rcl_subscription_t * subscription,
const char * filter_expression,
const rcutils_string_array_t * expression_parameters
);

/// Retrieve the filter expression of the subscription.
/**
* This function will return an filter expression by the given subscription.
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | Yes
* Thread-Safe | No
* Uses Atomics | Maybe [1]
* Lock-Free | Maybe [1]
*
* \param[in] subscription the subscription object to inspect.
* \param[out] filter_expression an filter expression, populated on success.
* It is up to the caller to deallocate the filter expression later on,
* using rcutils_get_default_allocator().deallocate().
* \param[out] expression_parameters Array of expression parameters, populated on success.
* It is up to the caller to finalize this array later on, using rcutils_string_array_fini().
* \return `RCL_RET_OK` if the query was successful, or
* \return `RCL_RET_INVALID_ARGUMENT` if `subscription` is NULL, or
* \return `RCL_RET_INVALID_ARGUMENT` if `filter_expression` is NULL, or
* \return `RCL_RET_INVALID_ARGUMENT` if `expression_parameters` is NULL, or
* \return `RCL_RET_BAD_ALLOC` if memory allocation fails, or
* \return `RCL_RET_UNSUPPORTED` if the implementation does not support content filter topic, or
* \return `RCL_RET_ERROR` if an unspecified error occurs.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_subscription_get_cft_expression_parameters(
const rcl_subscription_t * subscription,
char ** filter_expression,
rcutils_string_array_t * expression_parameters
);

/// Take a ROS message from a topic using a rcl subscription.
/**
* It is the job of the caller to ensure that the type of the ros_message
Expand Down
32 changes: 1 addition & 31 deletions rcl/src/rcl/logging_rosout.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "rcl/allocator.h"
#include "rcl/error_handling.h"
#include "rcl/logging_rosout.h"
#include "rcl/macros.h"
#include "rcl/node.h"
#include "rcl/publisher.h"
#include "rcl/time.h"
Expand Down Expand Up @@ -43,37 +44,6 @@ extern "C"
return RCL_RET_OK; \
}

#define RCL_RET_FROM_RCUTIL_RET(rcl_ret_var, rcutils_expr) \
{ \
rcutils_ret_t rcutils_ret = rcutils_expr; \
if (RCUTILS_RET_OK != rcutils_ret) { \
if (rcutils_error_is_set()) { \
RCL_SET_ERROR_MSG(rcutils_get_error_string().str); \
} else { \
RCL_SET_ERROR_MSG_WITH_FORMAT_STRING("rcutils_ret_t code: %i", rcutils_ret); \
} \
} \
switch (rcutils_ret) { \
case RCUTILS_RET_OK: \
rcl_ret_var = RCL_RET_OK; \
break; \
case RCUTILS_RET_ERROR: \
rcl_ret_var = RCL_RET_ERROR; \
break; \
case RCUTILS_RET_BAD_ALLOC: \
rcl_ret_var = RCL_RET_BAD_ALLOC; \
break; \
case RCUTILS_RET_INVALID_ARGUMENT: \
rcl_ret_var = RCL_RET_INVALID_ARGUMENT; \
break; \
case RCUTILS_RET_NOT_INITIALIZED: \
rcl_ret_var = RCL_RET_NOT_INIT; \
break; \
default: \
rcl_ret_var = RCUTILS_RET_ERROR; \
} \
}

typedef struct rosout_map_entry_t
{
rcl_node_t * node;
Expand Down
102 changes: 101 additions & 1 deletion rcl/src/rcl/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ extern "C"
#include "rcl/error_handling.h"
#include "rcl/node.h"
#include "rcutils/logging_macros.h"
#include "rcutils/strdup.h"
#include "rcutils/types/string_array.h"
#include "rmw/error_handling.h"
#include "rmw/validate_full_topic_name.h"
#include "tracetools/tracetools.h"
Expand Down Expand Up @@ -92,6 +94,7 @@ rcl_subscription_init(
sizeof(rcl_subscription_impl_t), allocator->state);
RCL_CHECK_FOR_NULL_WITH_MSG(
subscription->impl, "allocating memory failed", ret = RCL_RET_BAD_ALLOC; goto cleanup);
subscription->impl->options = rcl_subscription_get_default_options();
// Fill out the implemenation struct.
// rmw_handle
// TODO(wjwwood): pass allocator once supported in rmw api.
Expand All @@ -115,8 +118,8 @@ rcl_subscription_init(
}
subscription->impl->actual_qos.avoid_ros_namespace_conventions =
options->qos.avoid_ros_namespace_conventions;
// options
subscription->impl->options = *options;

RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription initialized");
ret = RCL_RET_OK;
TRACEPOINT(
Expand All @@ -138,6 +141,12 @@ rcl_subscription_init(
}
}

ret = rcl_subscription_options_fini(&subscription->impl->options);
if (RCL_RET_OK != ret) {
RCUTILS_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
}

allocator->deallocate(subscription->impl, allocator->state);
subscription->impl = NULL;
}
Expand Down Expand Up @@ -174,6 +183,13 @@ rcl_subscription_fini(rcl_subscription_t * subscription, rcl_node_t * node)
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
result = RCL_RET_ERROR;
}
rcl_ret_t rcl_ret = rcl_subscription_options_fini(&subscription->impl->options);
if (RCL_RET_OK != rcl_ret) {
RCUTILS_SAFE_FWRITE_TO_STDERR(rcl_get_error_string().str);
RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
result = RCL_RET_ERROR;
}

allocator.deallocate(subscription->impl, allocator.state);
subscription->impl = NULL;
}
Expand All @@ -193,6 +209,90 @@ rcl_subscription_get_default_options()
return default_options;
}

RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_subscription_options_fini(rcl_subscription_options_t * option)
{
RCL_CHECK_ARGUMENT_FOR_NULL(option, RCL_RET_INVALID_ARGUMENT);
// fini rmw_subscription_options_t
const rcl_allocator_t * allocator = &option->allocator;
RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
if (option->rmw_subscription_options.filter_expression) {
allocator->deallocate(option->rmw_subscription_options.filter_expression, allocator->state);
option->rmw_subscription_options.filter_expression = NULL;
}

if (option->rmw_subscription_options.expression_parameters) {
rcutils_ret_t ret = rcutils_string_array_fini(
option->rmw_subscription_options.expression_parameters);
if (RCUTILS_RET_OK != ret) {
RCUTILS_SAFE_FWRITE_TO_STDERR("Failed to fini string array.\n");
}
allocator->deallocate(option->rmw_subscription_options.expression_parameters, allocator->state);
option->rmw_subscription_options.expression_parameters = NULL;
}
return RCL_RET_OK;
}

bool
rcl_subscription_is_cft_supported(const rcl_subscription_t * subscription)
{
if (!rcl_subscription_is_valid(subscription)) {
return false;
}
return subscription->impl->rmw_handle->is_cft_supported;
}

rcl_ret_t
rcl_subscription_set_cft_expression_parameters(
const rcl_subscription_t * subscription,
const char * filter_expression,
const rcutils_string_array_t * expression_parameters
)
{
RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID);
RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT);

if (!rcl_subscription_is_valid(subscription)) {
return RCL_RET_SUBSCRIPTION_INVALID;
}
RCL_CHECK_ARGUMENT_FOR_NULL(filter_expression, RCL_RET_INVALID_ARGUMENT);
rmw_ret_t ret = rmw_subscription_set_cft_expression_parameters(
subscription->impl->rmw_handle, filter_expression, expression_parameters);

if (ret != RMW_RET_OK) {
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
return rcl_convert_rmw_ret_to_rcl_ret(ret);
}
return RCL_RET_OK;
}

rcl_ret_t
rcl_subscription_get_cft_expression_parameters(
const rcl_subscription_t * subscription,
char ** filter_expression,
rcutils_string_array_t * expression_parameters
)
{
RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID);
RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT);

if (!rcl_subscription_is_valid(subscription)) {
return RCL_RET_SUBSCRIPTION_INVALID;
}
RCL_CHECK_ARGUMENT_FOR_NULL(filter_expression, RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(expression_parameters, RCL_RET_INVALID_ARGUMENT);
rmw_ret_t ret = rmw_subscription_get_cft_expression_parameters(
subscription->impl->rmw_handle, filter_expression, expression_parameters);

if (ret != RMW_RET_OK) {
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
return rcl_convert_rmw_ret_to_rcl_ret(ret);
}
return RCL_RET_OK;
}

rcl_ret_t
rcl_take(
const rcl_subscription_t * subscription,
Expand Down
Loading

0 comments on commit 34d0802

Please sign in to comment.