From 9b8b8d5b8848cc9ab18cd54a2c4b407020415617 Mon Sep 17 00:00:00 2001 From: Lorenzo Miniero Date: Tue, 14 May 2024 10:43:34 +0200 Subject: [PATCH] Add new participant mutex to VideoRoom (#3361) --- src/plugins/janus_videoroom.c | 46 ++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/src/plugins/janus_videoroom.c b/src/plugins/janus_videoroom.c index 1b9fd0fe8d..97dfdc62aa 100644 --- a/src/plugins/janus_videoroom.c +++ b/src/plugins/janus_videoroom.c @@ -2149,6 +2149,7 @@ typedef struct janus_videoroom_publisher { int udp_sock; /* The udp socket on which to forward rtp packets */ gboolean kicked; /* Whether this participant has been kicked */ gboolean e2ee; /* If media from this publisher is end-to-end encrypted */ + janus_mutex mutex; /* Mutex to lock this instance */ volatile gint destroyed; janus_refcount ref; } janus_videoroom_publisher; @@ -2497,6 +2498,7 @@ static void janus_videoroom_publisher_free(const janus_refcount *p_ref) { janus_mutex_destroy(&p->subscribers_mutex); janus_mutex_destroy(&p->rtp_forwarders_mutex); + janus_mutex_destroy(&p->mutex); /* If this is a dummy publisher, get rid of the session too */ if(p->dummy && p->session) @@ -2817,6 +2819,7 @@ static void janus_videoroom_create_dummy_publisher(janus_videoroom *room, GHashT publisher->rtp_forwarders = g_hash_table_new(NULL, NULL); publisher->udp_sock = -1; g_atomic_int_set(&publisher->destroyed, 0); + janus_mutex_init(&publisher->mutex); janus_refcount_init(&publisher->ref, janus_videoroom_publisher_free); /* Now we create a separate publisher stream for each supported codec in the room */ janus_videoroom_publisher_stream *ps = NULL; @@ -4138,7 +4141,9 @@ static void janus_videoroom_leave_or_unpublish(janus_videoroom_publisher *partic g_hash_table_remove(participant->room->participants, string_ids ? (gpointer)participant->user_id_str : (gpointer)&participant->user_id); g_hash_table_remove(participant->room->private_ids, GUINT_TO_POINTER(participant->pvt_id)); + janus_mutex_lock(&participant->mutex); g_clear_pointer(&participant->room, janus_videoroom_room_dereference); + janus_mutex_unlock(&participant->mutex); } janus_mutex_unlock(&room->mutex); janus_refcount_decrease(&room->ref); @@ -5195,7 +5200,9 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi while (g_hash_table_iter_next(&iter, NULL, &value)) { janus_videoroom_publisher *p = value; if(p && !g_atomic_int_get(&p->destroyed) && p->session && p->room) { + janus_mutex_lock(&p->mutex); g_clear_pointer(&p->room, janus_videoroom_room_dereference); + janus_mutex_unlock(&p->mutex); /* Notify the user we're going to destroy the room... */ int ret = gateway->push_event(p->session->handle, &janus_videoroom_plugin, NULL, destroyed, NULL); JANUS_LOG(LOG_VERB, " >> %d (%s)\n", ret, janus_get_api_error(ret)); @@ -7407,6 +7414,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi publisher->rtp_forwarders = g_hash_table_new(NULL, NULL); publisher->udp_sock = -1; g_atomic_int_set(&publisher->destroyed, 0); + janus_mutex_init(&publisher->mutex); janus_refcount_init(&publisher->ref, janus_videoroom_publisher_free); /* Create publisher streams for all the things that the remote publisher is sending */ janus_videoroom_publisher_stream *ps = NULL; @@ -8120,11 +8128,19 @@ void janus_videoroom_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp janus_videoroom_incoming_rtp_internal(session, participant, pkt); } static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *session, janus_videoroom_publisher *participant, janus_plugin_rtp *pkt) { - if(g_atomic_int_get(&participant->destroyed) || participant->kicked || !participant->streams || participant->room == NULL) { + if(g_atomic_int_get(&participant->destroyed) || participant->kicked || !participant->streams) { janus_videoroom_publisher_dereference_nodebug(participant); return; } + janus_mutex_lock(&participant->mutex); janus_videoroom *videoroom = participant->room; + if(videoroom == NULL) { + janus_mutex_unlock(&participant->mutex); + janus_videoroom_publisher_dereference_nodebug(participant); + return; + } + janus_refcount_increase_nodebug(&videoroom->ref); + janus_mutex_unlock(&participant->mutex); /* Find the stream this packet belongs to */ janus_mutex_lock(&participant->streams_mutex); @@ -8137,6 +8153,7 @@ static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *sessi if(ps != NULL) janus_refcount_decrease_nodebug(&ps->ref); janus_videoroom_publisher_dereference_nodebug(participant); + janus_refcount_decrease_nodebug(&videoroom->ref); return; } @@ -8281,6 +8298,7 @@ static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *sessi char *payload = janus_rtp_payload(buf, len, &plen); if(payload == NULL) { janus_videoroom_publisher_dereference_nodebug(participant); + janus_refcount_decrease_nodebug(&videoroom->ref); return; } if(ps->vcodec == JANUS_VIDEOCODEC_VP9) { @@ -8351,6 +8369,7 @@ static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *sessi char *payload = janus_rtp_payload(buf, len, &plen); if(payload == NULL) { janus_videoroom_publisher_dereference_nodebug(participant); + janus_refcount_decrease_nodebug(&videoroom->ref); return; } if(ps->vcodec == JANUS_VIDEOCODEC_VP8) { @@ -8378,6 +8397,7 @@ static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *sessi } janus_refcount_decrease_nodebug(&ps->ref); janus_videoroom_publisher_dereference_nodebug(participant); + janus_refcount_decrease_nodebug(&videoroom->ref); } void janus_videoroom_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet) { @@ -8450,12 +8470,22 @@ static void janus_videoroom_incoming_data_internal(janus_videoroom_session *sess janus_videoroom_publisher_dereference_nodebug(participant); return; } - if(g_atomic_int_get(&participant->destroyed) || participant->kicked || !participant->streams || participant->room == NULL) { + if(g_atomic_int_get(&participant->destroyed) || participant->kicked || !participant->streams) { + janus_videoroom_publisher_dereference_nodebug(participant); + return; + } + janus_mutex_lock(&participant->mutex); + janus_videoroom *videoroom = participant->room; + if(videoroom == NULL) { + janus_mutex_unlock(&participant->mutex); janus_videoroom_publisher_dereference_nodebug(participant); return; } + janus_refcount_increase_nodebug(&videoroom->ref); + janus_mutex_unlock(&participant->mutex); if(g_atomic_int_get(&participant->destroyed) || participant->data_mindex < 0 || !participant->streams || participant->kicked) { janus_videoroom_publisher_dereference_nodebug(participant); + janus_refcount_decrease_nodebug(&videoroom->ref); return; } char *buf = packet->buffer; @@ -8527,14 +8557,15 @@ static void janus_videoroom_incoming_data_internal(janus_videoroom_session *sess pkt.is_rtp = FALSE; pkt.textdata = !packet->binary; janus_mutex_lock_nodebug(&ps->subscribers_mutex); - if(participant->room->helper_threads > 0) { - g_list_foreach(participant->room->threads, janus_videoroom_helper_rtpdata_packet, &pkt); + if(videoroom->helper_threads > 0) { + g_list_foreach(videoroom->threads, janus_videoroom_helper_rtpdata_packet, &pkt); } else { g_slist_foreach(ps->subscribers, janus_videoroom_relay_data_packet, &pkt); } janus_mutex_unlock_nodebug(&ps->subscribers_mutex); janus_refcount_decrease_nodebug(&ps->ref); janus_videoroom_publisher_dereference_nodebug(participant); + janus_refcount_decrease_nodebug(&videoroom->ref); } void janus_videoroom_data_ready(janus_plugin_session *handle) { @@ -8791,7 +8822,11 @@ static void janus_videoroom_hangup_media_internal(gpointer session_data) { g_list_free_full(mappings, (GDestroyNotify)g_free); } /* Any subscriber session to update? */ + janus_mutex_lock(&participant->mutex); janus_videoroom *room = participant->room; + if(room) + janus_refcount_increase_nodebug(&room->ref); + janus_mutex_unlock(&participant->mutex); if(subscribers != NULL) { temp = subscribers; while(temp) { @@ -8855,6 +8890,8 @@ static void janus_videoroom_hangup_media_internal(gpointer session_data) { janus_mutex_unlock(&participant->streams_mutex); janus_videoroom_leave_or_unpublish(participant, FALSE, FALSE); janus_refcount_decrease(&participant->ref); + if(room) + janus_refcount_decrease_nodebug(&room->ref); } else if(session->participant_type == janus_videoroom_p_type_subscriber) { /* Get rid of subscriber */ janus_videoroom_subscriber *subscriber = janus_videoroom_session_get_subscriber(session); @@ -9189,6 +9226,7 @@ static void *janus_videoroom_handler(void *data) { } } g_atomic_int_set(&publisher->destroyed, 0); + janus_mutex_init(&publisher->mutex); janus_refcount_init(&publisher->ref, janus_videoroom_publisher_free); /* In case we also wanted to configure */ if(audiocodec && json_string_value(json_object_get(msg->jsep, "sdp")) != NULL) {