Skip to content

Commit

Permalink
Batch source-add/source-removes. (#801)
Browse files Browse the repository at this point in the history
* ref: Move logic to handle sources from Conference to Participant.

* feat: Delay sending source-add/source-removes in order to batch them.

* squash: Fix typo in function name.
  • Loading branch information
bgrozev committed Sep 2, 2021
1 parent db0453c commit fbba7ee
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 61 deletions.
67 changes: 6 additions & 61 deletions src/main/java/org/jitsi/jicofo/JitsiMeetConferenceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.jitsi.impl.protocol.xmpp.*;
import org.jitsi.jicofo.auth.*;
import org.jitsi.jicofo.bridge.*;
import org.jitsi.jicofo.conference.*;
import org.jitsi.jicofo.conference.source.*;
import org.jitsi.jicofo.lipsynchack.*;
import org.jitsi.jicofo.util.*;
Expand Down Expand Up @@ -124,8 +123,7 @@ public class JitsiMeetConferenceImpl
private volatile ChatRoom chatRoom;

/**
* Operation set used to handle Jingle sessions with conference
* participants.
* Operation set used to handle Jingle sessions with conference participants.
*/
private OperationSetJingle jingle;

Expand Down Expand Up @@ -1402,24 +1400,7 @@ private void propagateNewSources(Participant sourceOwner, ConferenceSourceMap so

participants.stream()
.filter(otherParticipant -> otherParticipant != sourceOwner)
.forEach(
participant ->
{
if (participant.isSessionEstablished())
{
jingle.sendAddSourceIQ(
finalSources,
participant.getJingleSession(),
ConferenceConfig.config.getUseJsonEncodedSources()
&& participant.supportsJsonEncodedSources());
}
else
{
logger.warn("No jingle session yet for " + participant.getChatMember().getName());

participant.queueRemoteSourcesToAdd(finalSources);
}
});
.forEach(participant -> participant.addRemoteSources(finalSources));
}

/**
Expand Down Expand Up @@ -1694,30 +1675,10 @@ private XMPPError onSessionAcceptInternal(
}
}

// Loop over current participant and send 'source-add' notification
// Propagate [participant]'s sources to the other participants.
propagateNewSources(participant, sourcesAccepted);

boolean encodeSourcesAsJson
= ConferenceConfig.config.getUseJsonEncodedSources() && participant.supportsJsonEncodedSources();
for (SourcesToAddOrRemove sourcesToAddOrRemove : participant.clearQueuedRemoteSourceChanges())
{
AddOrRemove action = sourcesToAddOrRemove.getAction();
ConferenceSourceMap sources = sourcesToAddOrRemove.getSources();
logger.info(
"Sending a queued source-" + action.toString().toLowerCase()
+ " to " + participantId + ", sources:" + sources);
if (action == AddOrRemove.Add)
{
jingle.sendAddSourceIQ(
sourcesToAddOrRemove.getSources(),
jingleSession,
encodeSourcesAsJson);
}
else if (action == AddOrRemove.Remove)
{
jingle.sendRemoveSourceIQ(sourcesToAddOrRemove.getSources(), jingleSession, encodeSourcesAsJson);
}
}
// Now that the Jingle session is ready, signal any sources from other participants to [participant].
participant.sendQueuedRemoteSources();

return null;
}
Expand Down Expand Up @@ -1817,23 +1778,7 @@ private void sendSourceRemove(ConferenceSourceMap sources, Participant except)

participants.stream()
.filter(participant -> participant != except)
.forEach(
participant ->
{
if (participant.isSessionEstablished())
{
jingle.sendRemoveSourceIQ(
finalSources,
participant.getJingleSession(),
ConferenceConfig.config.getUseJsonEncodedSources()
&& participant.supportsJsonEncodedSources());
}
else
{
logger.warn("Remove source: no jingle session for " + participant.getEndpointId());
participant.queueRemoteSourcesToRemove(finalSources);
}
});
.forEach(participant -> participant.removeRemoteSources(finalSources));
}

/**
Expand Down
159 changes: 159 additions & 0 deletions src/main/java/org/jitsi/jicofo/Participant.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.jetbrains.annotations.*;
import org.jitsi.impl.protocol.xmpp.*;
import org.jitsi.jicofo.conference.*;
import org.jitsi.jicofo.conference.source.*;
import org.jitsi.jicofo.xmpp.muc.*;
import org.jitsi.utils.*;
Expand All @@ -33,6 +34,7 @@

import java.time.*;
import java.util.*;
import java.util.concurrent.*;

import static java.time.temporal.ChronoUnit.SECONDS;

Expand Down Expand Up @@ -110,6 +112,17 @@ public static String getEndpointId(ChatRoomMember chatRoomMember)
*/
private final JitsiMeetConferenceImpl conference;

/**
* The task, if any, currently scheduled to signal queued remote sources.
*/
private ScheduledFuture<?> signalQueuedSourcesTask;

/**
* The lock used when queueing remote sources to be signaled with a delay, i.e. when setting
* {@link #signalQueuedSourcesTask}.
*/
private final Object signalQueuedSourcesTaskSyncRoot = new Object();

/**
* Creates new {@link Participant} for given chat room member.
*
Expand All @@ -126,6 +139,7 @@ public Participant(
this.conference = conference;
this.roomMember = roomMember;
this.logger = parentLogger.createChildLogger(getClass().getName());
logger.addContext("participant", getEndpointId());
}

/**
Expand Down Expand Up @@ -521,6 +535,151 @@ public void setMuted(MediaType mediaType, boolean value)
}
}

/**
* Add a set of remote sources, which are to be signaled to the remote side. The sources may be signaled
* immediately, or queued to be signaled later.
* @param sources the sources to add.
*/
public void addRemoteSources(ConferenceSourceMap sources)
{
if (!isSessionEstablished())
{
logger.debug("No Jingle session yet, queueing source-add.");
queueRemoteSourcesToAdd(sources);
// No need to schedule, the sources will be signaled when the session is established.
return;
}

int delayMs = ConferenceConfig.config.getSourceSignalingDelayMs(conference.getParticipantCount());
if (delayMs > 0)
{
synchronized (signalQueuedSourcesTaskSyncRoot)
{
queueRemoteSourcesToAdd(sources);
scheduleSignalingOfQueuedSources(delayMs);
}
}
else
{
OperationSetJingle jingle = conference.getJingle();
if (jingle == null)
{
logger.error("Can not send Jingle source-add, no Jingle API available.");
return;
}
jingle.sendAddSourceIQ(
sources,
getJingleSession(),
ConferenceConfig.config.getUseJsonEncodedSources() && supportsJsonEncodedSources());
}
}

/**
* Schedule a task to signal all queued remote sources to the remote side. If a task is already scheduled, does
* not schedule a new one (the existing task will send all latest queued sources).
* @param delayMs the delay in milliseconds after which the task is to execute.
*/
private void scheduleSignalingOfQueuedSources(int delayMs)
{
synchronized (signalQueuedSourcesTaskSyncRoot)
{
if (signalQueuedSourcesTask == null)
{
logger.debug("Scheduling a task to signal queued remote sources after " + delayMs + " ms.");
signalQueuedSourcesTask = TaskPools.getScheduledPool().schedule(() ->
{
synchronized (signalQueuedSourcesTaskSyncRoot)
{
sendQueuedRemoteSources();
signalQueuedSourcesTask = null;
}
},
delayMs,
TimeUnit.MILLISECONDS);
}
}
}


/**
* Removee a set of remote sources, which are to be signaled as removed to the remote side. The sources may be
* signaled immediately, or queued to be signaled later.
* @param sources the sources to remove.
*/
public void removeRemoteSources(ConferenceSourceMap sources)
{
if (!isSessionEstablished())
{
logger.debug("No Jingle session yet, queueing source-remove.");
queueRemoteSourcesToRemove(sources);
// No need to schedule, the sources will be signaled when the session is established.
return;
}

int delayMs = ConferenceConfig.config.getSourceSignalingDelayMs(conference.getParticipantCount());
if (delayMs > 0)
{
synchronized (signalQueuedSourcesTaskSyncRoot)
{
queueRemoteSourcesToRemove(sources);
scheduleSignalingOfQueuedSources(delayMs);
}
}
else
{
OperationSetJingle jingle = conference.getJingle();
if (jingle == null)
{
logger.error("Can not send Jingle source-remove, no Jingle API available.");
return;
}
jingle.sendRemoveSourceIQ(
sources,
getJingleSession(),
ConferenceConfig.config.getUseJsonEncodedSources() && supportsJsonEncodedSources());
}
}

/**
* Signal any queued remote source modifications (either addition or removal) to the remote side.
*/
public void sendQueuedRemoteSources()
{
OperationSetJingle jingle = conference.getJingle();
if (jingle == null)
{
logger.error("Can not signal remote sources, no Jingle API available");
return;
}

if (!isSessionEstablished())
{
logger.warn("Can not singal remote sources, Jingle session not established.");
return;
}

boolean encodeSourcesAsJson
= ConferenceConfig.config.getUseJsonEncodedSources() && supportsJsonEncodedSources();

for (SourcesToAddOrRemove sourcesToAddOrRemove : clearQueuedRemoteSourceChanges())
{
AddOrRemove action = sourcesToAddOrRemove.getAction();
ConferenceSourceMap sources = sourcesToAddOrRemove.getSources();
logger.info("Sending a queued source-" + action.toString().toLowerCase() + ", sources:" + sources);
if (action == AddOrRemove.Add)
{
jingle.sendAddSourceIQ(
sourcesToAddOrRemove.getSources(),
jingleSession,
encodeSourcesAsJson);
}
else if (action == AddOrRemove.Remove)
{
jingle.sendRemoveSourceIQ(sourcesToAddOrRemove.getSources(), jingleSession, encodeSourcesAsJson);
}
}
}

/**
* Checks whether the participant is muted.
* @param mediaType the media type to check.
Expand Down
17 changes: 17 additions & 0 deletions src/main/kotlin/org/jitsi/jicofo/ConferenceConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
*/
package org.jitsi.jicofo

import com.typesafe.config.ConfigObject
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import org.jitsi.config.JitsiConfig.Companion.legacyConfig
import org.jitsi.config.JitsiConfig.Companion.newConfig
import org.jitsi.metaconfig.config
import java.time.Duration
import java.util.TreeMap

@SuppressFBWarnings(value = ["BX_UNBOXING_IMMEDIATELY_REBOXED"], justification = "False positive.")
class ConferenceConfig {
val conferenceStartTimeout: Duration by config {
"org.jitsi.focus.IDLE_TIMEOUT".from(legacyConfig)
Expand Down Expand Up @@ -72,6 +76,19 @@ class ConferenceConfig {
}
fun useRandomSharedDocumentName(): Boolean = useRandomSharedDocumentName

private val sourceSignalingDelays: TreeMap<Int, Int> by config {
"jicofo.conference.source-signaling-delays".from(newConfig)
.convertFrom<ConfigObject> { cfg ->
TreeMap(cfg.entries.associate { it.key.toInt() to it.value.unwrapped() as Int })
}
}

/**
* Get the number of milliseconds to delay signaling of Jingle sources given a certain [conferenceSize].
*/
fun getSourceSignalingDelayMs(conferenceSize: Int) =
sourceSignalingDelays.floorEntry(conferenceSize)?.value ?: 0

/**
* Whether to strip simulcast streams when signaling receivers. This option requires that jitsi-videobridge
* uses the first SSRC in the SIM group as the target SSRC when rewriting streams, as this is the only SSRC
Expand Down
10 changes: 10 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@ jicofo {
// If `true` the shared document uses a random name. Otherwise, it uses the conference name.
use-random-name = false
}

// How much to delay signaling Jingle source-add and source-remove in order to batch them and reduce the number of
// messages (based on conference size). The values are in milliseconds.
source-signaling-delays = {
// Conferences with size <50 will have delay=0.
// Conferences with size between 50 and 99 will have delay=500 ms.
#50 = 500
// Conferences with size >=100 have delay=1000 ms.
#100 = 1000
}
}

// Configuration for the internal health checks performed by jicofo.
Expand Down
Loading

0 comments on commit fbba7ee

Please sign in to comment.