Skip to content

Commit

Permalink
Fix handling of "data" stream parameters in the streaming plugin (#3412)
Browse files Browse the repository at this point in the history
  • Loading branch information
atoppi committed Sep 4, 2024
1 parent 65216f2 commit 212e4fc
Showing 1 changed file with 35 additions and 16 deletions.
51 changes: 35 additions & 16 deletions src/plugins/janus_streaming.c
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,7 @@ static struct janus_json_parameter rtp_media_parameters[] = {
{"port3", JANUS_JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
{"svc", JANUS_JSON_BOOL, 0},
/* Data only */
{"datatype", JANUS_JSON_STRING, 0},
{"buffermsg", JANUS_JSON_BOOL, 0},
};
static struct janus_json_parameter rtp_audio_parameters[] = {
Expand Down Expand Up @@ -2181,8 +2182,8 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
janus_config_item *vsps = janus_config_get(config, m, janus_config_type_item, "h264sps");
janus_config_item *vkf = janus_config_get(config, m, janus_config_type_item, "bufferkf");
janus_config_item *vsc = janus_config_get(config, m, janus_config_type_item, "simulcast");
janus_config_item *dbm = janus_config_get(config, cat, janus_config_type_item, "buffermsg");
janus_config_item *dt = janus_config_get(config, cat, janus_config_type_item, "datatype");
janus_config_item *dbm = janus_config_get(config, m, janus_config_type_item, "buffermsg");
janus_config_item *dt = janus_config_get(config, m, janus_config_type_item, "datatype");
janus_config_item *vport2 = janus_config_get(config, m, janus_config_type_item, "port2");
janus_config_item *vport3 = janus_config_get(config, m, janus_config_type_item, "port3");
janus_config_item *vsvc = janus_config_get(config, m, janus_config_type_item, "svc");
Expand Down Expand Up @@ -4682,7 +4683,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi
if(stream->type == JANUS_STREAMING_MEDIA_DATA)
janus_config_add(config, m, janus_config_item_create("datatype", stream->textdata ? "text" : "binary"));
if(stream->buffermsg)
janus_config_add(config, m, janus_config_item_create("databuffermsg", "true"));
janus_config_add(config, m, janus_config_item_create("buffermsg", "true"));
temp = temp->next;
}
}
Expand Down Expand Up @@ -5587,15 +5588,6 @@ void janus_streaming_setup_media(janus_plugin_session *handle) {
}
janus_mutex_unlock(&stream->keyframe.mutex);
}
if(stream->buffermsg) {
JANUS_LOG(LOG_HUGE, "Any recent datachannel message to send? (%s)\n", stream->mid);
janus_mutex_lock(&stream->buffermsg_mutex);
if(stream->last_msg != NULL) {
JANUS_LOG(LOG_HUGE, "Yep!\n");
janus_streaming_relay_rtp_packet(session, stream->last_msg);
}
janus_mutex_unlock(&stream->buffermsg_mutex);
}
/* If this mountpoint has RTCP support, send a PLI */
if(stream->type == JANUS_STREAMING_MEDIA_VIDEO)
janus_streaming_rtcp_pli_send(stream);
Expand Down Expand Up @@ -5679,9 +5671,31 @@ void janus_streaming_data_ready(janus_plugin_session *handle) {
janus_streaming_session *session = (janus_streaming_session *)handle->plugin_handle;
if(!session || g_atomic_int_get(&session->destroyed) || g_atomic_int_get(&session->hangingup))
return;
janus_refcount_increase(&session->ref);
if(g_atomic_int_compare_and_exchange(&session->dataready, 0, 1)) {
JANUS_LOG(LOG_INFO, "[%s-%p] Data channel available\n", JANUS_STREAMING_PACKAGE, handle);
/* Try to send a buffered datachannel message when datachannel is ready */
GList *temp = session->streams;
janus_streaming_session_stream *s;
janus_streaming_rtp_source_stream *stream;
while(temp) {
s = (janus_streaming_session_stream *)temp->data;
stream = s->stream;
if(stream->buffermsg) {
janus_refcount_increase(&stream->ref);
JANUS_LOG(LOG_VERB, "[%s-%p] Trying to send the most recent message (%s)\n", JANUS_STREAMING_PACKAGE, handle, stream->mid);
janus_mutex_lock(&stream->buffermsg_mutex);
if(stream->last_msg != NULL) {
JANUS_LOG(LOG_HUGE, "Buffered datachannel message found!\n");
janus_streaming_relay_rtp_packet(session, stream->last_msg);
}
janus_mutex_unlock(&stream->buffermsg_mutex);
janus_refcount_decrease(&stream->ref);
}
temp = temp->next;
}
}
janus_refcount_decrease(&session->ref);
}

void janus_streaming_hangup_media(janus_plugin_session *handle) {
Expand Down Expand Up @@ -9949,14 +9963,19 @@ static void *janus_streaming_relay_thread(void *data) {
/* Are we keeping track of the last message being relayed? */
if(stream->buffermsg) {
janus_mutex_lock(&stream->buffermsg_mutex);
if(stream->last_msg != NULL) {
janus_streaming_rtp_relay_packet_free((janus_streaming_rtp_relay_packet *)stream->last_msg);
stream->last_msg = NULL;
}
janus_streaming_rtp_relay_packet *pkt = g_malloc0(sizeof(janus_streaming_rtp_relay_packet));
pkt->data = g_malloc(bytes);
memcpy(pkt->data, data, bytes);
packet.mindex = stream->mindex;
packet.is_rtp = FALSE;
packet.is_data = TRUE;
packet.textdata = stream->textdata;
pkt->mindex = stream->mindex;
pkt->is_data = TRUE;
pkt->textdata = stream->textdata;
pkt->length = bytes;
/* Store the latest message */
stream->last_msg = pkt;
janus_mutex_unlock(&stream->buffermsg_mutex);
}
/* Go! */
Expand Down

0 comments on commit 212e4fc

Please sign in to comment.