Skip to content

Commit

Permalink
zbus: channel msg subscriber pool isolation
Browse files Browse the repository at this point in the history
Currently, zbus uses a single global `net_buf`pool to publish messages to
msg_subscribers. It would be good to have a way to separate the pools for
channels related to critical parts of the systems to avoid publication
failure on these particular channels. These channels will not use the
global pool. They can set an isolated pool by calling the
`zbus_chan_set_msg_sub_pool.`

Signed-off-by: Rodrigo Peixoto <[email protected]>
  • Loading branch information
rodrigopex authored and nashif committed Jun 18, 2024
1 parent da70246 commit 3b10caf
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 5 deletions.
28 changes: 28 additions & 0 deletions include/zephyr/zbus/zbus.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ struct zbus_channel_data {
*/
sys_slist_t observers;
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */

#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION) || defined(__DOXYGEN__)
/** Net buf pool for message subscribers. It can be either the global or a separated one.
*/
struct net_buf_pool *msg_subscriber_pool;
#endif /* ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION */
};

/**
Expand Down Expand Up @@ -347,6 +353,9 @@ struct zbus_channel_observation {
.user_data = _user_data, \
.validator = _validator, \
.data = &_CONCAT(_zbus_chan_data_, _name), \
IF_ENABLED(ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION, ( \
.msg_subscriber_pool = &_zbus_msg_subscribers_pool, \
)) \
}; \
/* Extern declaration of observers */ \
ZBUS_OBS_DECLARE(_observers); \
Expand Down Expand Up @@ -684,6 +693,25 @@ static inline void *zbus_chan_user_data(const struct zbus_channel *chan)
return chan->user_data;
}

#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION) || defined(__DOXYGEN__)

/**
* @brief Set the channel's msg subscriber `net_buf` pool.
*
* @param chan The channel's reference.
* @param pool The reference to the `net_buf` memory pool.
*/
static inline void zbus_chan_set_msg_sub_pool(const struct zbus_channel *chan,
struct net_buf_pool *pool)
{
__ASSERT(chan != NULL, "chan is required");
__ASSERT(pool != NULL, "pool is required");

chan->data->msg_subscriber_pool = pool;
}

#endif /* ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION */

#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) || defined(__DOXYGEN__)

/**
Expand Down
4 changes: 4 additions & 0 deletions subsys/zbus/Kconfig
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ config ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_STATIC

endchoice

config ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION
default n
bool "Use isolated pools instead of only using the global pool."

config ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE
default 16
int "The count of net_buf available to be used simutaneously."
Expand Down
13 changes: 8 additions & 5 deletions subsys/zbus/zbus.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ static struct k_spinlock obs_slock;

NET_BUF_POOL_HEAP_DEFINE(_zbus_msg_subscribers_pool, CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE,
sizeof(struct zbus_channel *), NULL);

BUILD_ASSERT(K_HEAP_MEM_POOL_SIZE > 0, "MSG_SUBSCRIBER feature requires heap memory pool.");

static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size,
k_timeout_t timeout)
{
return net_buf_alloc_len(&_zbus_msg_subscribers_pool, size, timeout);
return net_buf_alloc_len(pool, size, timeout);
}

#else
Expand All @@ -47,7 +48,7 @@ static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, si
"CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE must be greater or equal to "
"%d",
(int)size);
return net_buf_alloc(&_zbus_msg_subscribers_pool, timeout);
return net_buf_alloc(pool, timeout);
}
#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC */

Expand Down Expand Up @@ -129,8 +130,11 @@ static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t
struct zbus_channel_observation_mask *observation_mask;

#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
buf = _zbus_create_net_buf(&_zbus_msg_subscribers_pool, zbus_chan_msg_size(chan),
sys_timepoint_timeout(end_time));
struct net_buf_pool *pool =
COND_CODE_1(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION,
(chan->data->msg_subscriber_pool), (&_zbus_msg_subscribers_pool));

buf = _zbus_create_net_buf(pool, zbus_chan_msg_size(chan), sys_timepoint_timeout(end_time));

_ZBUS_ASSERT(buf != NULL, "net_buf zbus_msg_subscribers_pool is "
"unavailable or heap is full");
Expand Down Expand Up @@ -179,7 +183,6 @@ static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t
struct zbus_observer_node *obs_nd, *tmp;

SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) {

const struct zbus_observer *obs = obs_nd->obs;

if (!obs->data->enabled) {
Expand Down

0 comments on commit 3b10caf

Please sign in to comment.