From 212e4fcbe3fc5d6cb6e6a63fae201c5026fdf348 Mon Sep 17 00:00:00 2001 From: Alessandro Toppi Date: Wed, 4 Sep 2024 13:40:08 +0200 Subject: [PATCH] Fix handling of "data" stream parameters in the streaming plugin (#3412) --- src/plugins/janus_streaming.c | 51 ++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/src/plugins/janus_streaming.c b/src/plugins/janus_streaming.c index aa5cd2f9a3..b578257228 100644 --- a/src/plugins/janus_streaming.c +++ b/src/plugins/janus_streaming.c @@ -1118,6 +1118,7 @@ static struct janus_json_parameter rtp_media_parameters[] = { {"port3", JANUS_JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"svc", JANUS_JSON_BOOL, 0}, /* Data only */ + {"datatype", JANUS_JSON_STRING, 0}, {"buffermsg", JANUS_JSON_BOOL, 0}, }; static struct janus_json_parameter rtp_audio_parameters[] = { @@ -2181,8 +2182,8 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { janus_config_item *vsps = janus_config_get(config, m, janus_config_type_item, "h264sps"); janus_config_item *vkf = janus_config_get(config, m, janus_config_type_item, "bufferkf"); janus_config_item *vsc = janus_config_get(config, m, janus_config_type_item, "simulcast"); - janus_config_item *dbm = janus_config_get(config, cat, janus_config_type_item, "buffermsg"); - janus_config_item *dt = janus_config_get(config, cat, janus_config_type_item, "datatype"); + janus_config_item *dbm = janus_config_get(config, m, janus_config_type_item, "buffermsg"); + janus_config_item *dt = janus_config_get(config, m, janus_config_type_item, "datatype"); janus_config_item *vport2 = janus_config_get(config, m, janus_config_type_item, "port2"); janus_config_item *vport3 = janus_config_get(config, m, janus_config_type_item, "port3"); janus_config_item *vsvc = janus_config_get(config, m, janus_config_type_item, "svc"); @@ -4682,7 +4683,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi if(stream->type == JANUS_STREAMING_MEDIA_DATA) janus_config_add(config, m, janus_config_item_create("datatype", stream->textdata ? "text" : "binary")); if(stream->buffermsg) - janus_config_add(config, m, janus_config_item_create("databuffermsg", "true")); + janus_config_add(config, m, janus_config_item_create("buffermsg", "true")); temp = temp->next; } } @@ -5587,15 +5588,6 @@ void janus_streaming_setup_media(janus_plugin_session *handle) { } janus_mutex_unlock(&stream->keyframe.mutex); } - if(stream->buffermsg) { - JANUS_LOG(LOG_HUGE, "Any recent datachannel message to send? (%s)\n", stream->mid); - janus_mutex_lock(&stream->buffermsg_mutex); - if(stream->last_msg != NULL) { - JANUS_LOG(LOG_HUGE, "Yep!\n"); - janus_streaming_relay_rtp_packet(session, stream->last_msg); - } - janus_mutex_unlock(&stream->buffermsg_mutex); - } /* If this mountpoint has RTCP support, send a PLI */ if(stream->type == JANUS_STREAMING_MEDIA_VIDEO) janus_streaming_rtcp_pli_send(stream); @@ -5679,9 +5671,31 @@ void janus_streaming_data_ready(janus_plugin_session *handle) { janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle; if(!session || g_atomic_int_get(&session->destroyed) || g_atomic_int_get(&session->hangingup)) return; + janus_refcount_increase(&session->ref); if(g_atomic_int_compare_and_exchange(&session->dataready, 0, 1)) { JANUS_LOG(LOG_INFO, "[%s-%p] Data channel available\n", JANUS_STREAMING_PACKAGE, handle); + /* Try to send a buffered datachannel message when datachannel is ready */ + GList *temp = session->streams; + janus_streaming_session_stream *s; + janus_streaming_rtp_source_stream *stream; + while(temp) { + s = (janus_streaming_session_stream *)temp->data; + stream = s->stream; + if(stream->buffermsg) { + janus_refcount_increase(&stream->ref); + JANUS_LOG(LOG_VERB, "[%s-%p] Trying to send the most recent message (%s)\n", JANUS_STREAMING_PACKAGE, handle, stream->mid); + janus_mutex_lock(&stream->buffermsg_mutex); + if(stream->last_msg != NULL) { + JANUS_LOG(LOG_HUGE, "Buffered datachannel message found!\n"); + janus_streaming_relay_rtp_packet(session, stream->last_msg); + } + janus_mutex_unlock(&stream->buffermsg_mutex); + janus_refcount_decrease(&stream->ref); + } + temp = temp->next; + } } + janus_refcount_decrease(&session->ref); } void janus_streaming_hangup_media(janus_plugin_session *handle) { @@ -9949,14 +9963,19 @@ static void *janus_streaming_relay_thread(void *data) { /* Are we keeping track of the last message being relayed? */ if(stream->buffermsg) { janus_mutex_lock(&stream->buffermsg_mutex); + if(stream->last_msg != NULL) { + janus_streaming_rtp_relay_packet_free((janus_streaming_rtp_relay_packet *)stream->last_msg); + stream->last_msg = NULL; + } janus_streaming_rtp_relay_packet *pkt = g_malloc0(sizeof(janus_streaming_rtp_relay_packet)); pkt->data = g_malloc(bytes); memcpy(pkt->data, data, bytes); - packet.mindex = stream->mindex; - packet.is_rtp = FALSE; - packet.is_data = TRUE; - packet.textdata = stream->textdata; + pkt->mindex = stream->mindex; + pkt->is_data = TRUE; + pkt->textdata = stream->textdata; pkt->length = bytes; + /* Store the latest message */ + stream->last_msg = pkt; janus_mutex_unlock(&stream->buffermsg_mutex); } /* Go! */