diff --git a/src/main/java/org/jitsi/jicofo/JitsiMeetConferenceImpl.java b/src/main/java/org/jitsi/jicofo/JitsiMeetConferenceImpl.java index baf4c0f4e1..247c33f72a 100644 --- a/src/main/java/org/jitsi/jicofo/JitsiMeetConferenceImpl.java +++ b/src/main/java/org/jitsi/jicofo/JitsiMeetConferenceImpl.java @@ -23,7 +23,6 @@ import org.jitsi.impl.protocol.xmpp.*; import org.jitsi.jicofo.auth.*; import org.jitsi.jicofo.bridge.*; -import org.jitsi.jicofo.conference.*; import org.jitsi.jicofo.conference.source.*; import org.jitsi.jicofo.lipsynchack.*; import org.jitsi.jicofo.util.*; @@ -124,8 +123,7 @@ public class JitsiMeetConferenceImpl private volatile ChatRoom chatRoom; /** - * Operation set used to handle Jingle sessions with conference - * participants. + * Operation set used to handle Jingle sessions with conference participants. */ private OperationSetJingle jingle; @@ -1402,24 +1400,7 @@ private void propagateNewSources(Participant sourceOwner, ConferenceSourceMap so participants.stream() .filter(otherParticipant -> otherParticipant != sourceOwner) - .forEach( - participant -> - { - if (participant.isSessionEstablished()) - { - jingle.sendAddSourceIQ( - finalSources, - participant.getJingleSession(), - ConferenceConfig.config.getUseJsonEncodedSources() - && participant.supportsJsonEncodedSources()); - } - else - { - logger.warn("No jingle session yet for " + participant.getChatMember().getName()); - - participant.queueRemoteSourcesToAdd(finalSources); - } - }); + .forEach(participant -> participant.addRemoteSources(finalSources)); } /** @@ -1694,30 +1675,10 @@ private XMPPError onSessionAcceptInternal( } } - // Loop over current participant and send 'source-add' notification + // Propagate [participant]'s sources to the other participants. propagateNewSources(participant, sourcesAccepted); - - boolean encodeSourcesAsJson - = ConferenceConfig.config.getUseJsonEncodedSources() && participant.supportsJsonEncodedSources(); - for (SourcesToAddOrRemove sourcesToAddOrRemove : participant.clearQueuedRemoteSourceChanges()) - { - AddOrRemove action = sourcesToAddOrRemove.getAction(); - ConferenceSourceMap sources = sourcesToAddOrRemove.getSources(); - logger.info( - "Sending a queued source-" + action.toString().toLowerCase() - + " to " + participantId + ", sources:" + sources); - if (action == AddOrRemove.Add) - { - jingle.sendAddSourceIQ( - sourcesToAddOrRemove.getSources(), - jingleSession, - encodeSourcesAsJson); - } - else if (action == AddOrRemove.Remove) - { - jingle.sendRemoveSourceIQ(sourcesToAddOrRemove.getSources(), jingleSession, encodeSourcesAsJson); - } - } + // Now that the Jingle session is ready, signal any sources from other participants to [participant]. + participant.sendQueuedRemoteSources(); return null; } @@ -1817,23 +1778,7 @@ private void sendSourceRemove(ConferenceSourceMap sources, Participant except) participants.stream() .filter(participant -> participant != except) - .forEach( - participant -> - { - if (participant.isSessionEstablished()) - { - jingle.sendRemoveSourceIQ( - finalSources, - participant.getJingleSession(), - ConferenceConfig.config.getUseJsonEncodedSources() - && participant.supportsJsonEncodedSources()); - } - else - { - logger.warn("Remove source: no jingle session for " + participant.getEndpointId()); - participant.queueRemoteSourcesToRemove(finalSources); - } - }); + .forEach(participant -> participant.removeRemoteSources(finalSources)); } /** diff --git a/src/main/java/org/jitsi/jicofo/Participant.java b/src/main/java/org/jitsi/jicofo/Participant.java index 64cae42b31..0bdded7f4b 100644 --- a/src/main/java/org/jitsi/jicofo/Participant.java +++ b/src/main/java/org/jitsi/jicofo/Participant.java @@ -19,6 +19,7 @@ import org.jetbrains.annotations.*; import org.jitsi.impl.protocol.xmpp.*; +import org.jitsi.jicofo.conference.*; import org.jitsi.jicofo.conference.source.*; import org.jitsi.jicofo.xmpp.muc.*; import org.jitsi.utils.*; @@ -33,6 +34,7 @@ import java.time.*; import java.util.*; +import java.util.concurrent.*; import static java.time.temporal.ChronoUnit.SECONDS; @@ -110,6 +112,17 @@ public static String getEndpointId(ChatRoomMember chatRoomMember) */ private final JitsiMeetConferenceImpl conference; + /** + * The task, if any, currently scheduled to signal queued remote sources. + */ + private ScheduledFuture signalQueuedSourcesTask; + + /** + * The lock used when queueing remote sources to be signaled with a delay, i.e. when setting + * {@link #signalQueuedSourcesTask}. + */ + private final Object signalQueuedSourcesTaskSyncRoot = new Object(); + /** * Creates new {@link Participant} for given chat room member. * @@ -126,6 +139,7 @@ public Participant( this.conference = conference; this.roomMember = roomMember; this.logger = parentLogger.createChildLogger(getClass().getName()); + logger.addContext("participant", getEndpointId()); } /** @@ -521,6 +535,151 @@ public void setMuted(MediaType mediaType, boolean value) } } + /** + * Add a set of remote sources, which are to be signaled to the remote side. The sources may be signaled + * immediately, or queued to be signaled later. + * @param sources the sources to add. + */ + public void addRemoteSources(ConferenceSourceMap sources) + { + if (!isSessionEstablished()) + { + logger.debug("No Jingle session yet, queueing source-add."); + queueRemoteSourcesToAdd(sources); + // No need to schedule, the sources will be signaled when the session is established. + return; + } + + int delayMs = ConferenceConfig.config.getSourceSignalingDelayMs(conference.getParticipantCount()); + if (delayMs > 0) + { + synchronized (signalQueuedSourcesTaskSyncRoot) + { + queueRemoteSourcesToAdd(sources); + scheduleSignalingOfQueuedSources(delayMs); + } + } + else + { + OperationSetJingle jingle = conference.getJingle(); + if (jingle == null) + { + logger.error("Can not send Jingle source-add, no Jingle API available."); + return; + } + jingle.sendAddSourceIQ( + sources, + getJingleSession(), + ConferenceConfig.config.getUseJsonEncodedSources() && supportsJsonEncodedSources()); + } + } + + /** + * Schedule a task to signal all queued remote sources to the remote side. If a task is already scheduled, does + * not schedule a new one (the existing task will send all latest queued sources). + * @param delayMs the delay in milliseconds after which the task is to execute. + */ + private void scheduleSignalingOfQueuedSources(int delayMs) + { + synchronized (signalQueuedSourcesTaskSyncRoot) + { + if (signalQueuedSourcesTask == null) + { + logger.debug("Scheduling a task to signal queued remote sources after " + delayMs + " ms."); + signalQueuedSourcesTask = TaskPools.getScheduledPool().schedule(() -> + { + synchronized (signalQueuedSourcesTaskSyncRoot) + { + sendQueuedRemoteSources(); + signalQueuedSourcesTask = null; + } + }, + delayMs, + TimeUnit.MILLISECONDS); + } + } + } + + + /** + * Removee a set of remote sources, which are to be signaled as removed to the remote side. The sources may be + * signaled immediately, or queued to be signaled later. + * @param sources the sources to remove. + */ + public void removeRemoteSources(ConferenceSourceMap sources) + { + if (!isSessionEstablished()) + { + logger.debug("No Jingle session yet, queueing source-remove."); + queueRemoteSourcesToRemove(sources); + // No need to schedule, the sources will be signaled when the session is established. + return; + } + + int delayMs = ConferenceConfig.config.getSourceSignalingDelayMs(conference.getParticipantCount()); + if (delayMs > 0) + { + synchronized (signalQueuedSourcesTaskSyncRoot) + { + queueRemoteSourcesToRemove(sources); + scheduleSignalingOfQueuedSources(delayMs); + } + } + else + { + OperationSetJingle jingle = conference.getJingle(); + if (jingle == null) + { + logger.error("Can not send Jingle source-remove, no Jingle API available."); + return; + } + jingle.sendRemoveSourceIQ( + sources, + getJingleSession(), + ConferenceConfig.config.getUseJsonEncodedSources() && supportsJsonEncodedSources()); + } + } + + /** + * Signal any queued remote source modifications (either addition or removal) to the remote side. + */ + public void sendQueuedRemoteSources() + { + OperationSetJingle jingle = conference.getJingle(); + if (jingle == null) + { + logger.error("Can not signal remote sources, no Jingle API available"); + return; + } + + if (!isSessionEstablished()) + { + logger.warn("Can not singal remote sources, Jingle session not established."); + return; + } + + boolean encodeSourcesAsJson + = ConferenceConfig.config.getUseJsonEncodedSources() && supportsJsonEncodedSources(); + + for (SourcesToAddOrRemove sourcesToAddOrRemove : clearQueuedRemoteSourceChanges()) + { + AddOrRemove action = sourcesToAddOrRemove.getAction(); + ConferenceSourceMap sources = sourcesToAddOrRemove.getSources(); + logger.info("Sending a queued source-" + action.toString().toLowerCase() + ", sources:" + sources); + if (action == AddOrRemove.Add) + { + jingle.sendAddSourceIQ( + sourcesToAddOrRemove.getSources(), + jingleSession, + encodeSourcesAsJson); + } + else if (action == AddOrRemove.Remove) + { + jingle.sendRemoveSourceIQ(sourcesToAddOrRemove.getSources(), jingleSession, encodeSourcesAsJson); + } + } + } + /** * Checks whether the participant is muted. * @param mediaType the media type to check. diff --git a/src/main/kotlin/org/jitsi/jicofo/ConferenceConfig.kt b/src/main/kotlin/org/jitsi/jicofo/ConferenceConfig.kt index 2081e0911a..6c62b0e65d 100644 --- a/src/main/kotlin/org/jitsi/jicofo/ConferenceConfig.kt +++ b/src/main/kotlin/org/jitsi/jicofo/ConferenceConfig.kt @@ -17,11 +17,15 @@ */ package org.jitsi.jicofo +import com.typesafe.config.ConfigObject +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import org.jitsi.config.JitsiConfig.Companion.legacyConfig import org.jitsi.config.JitsiConfig.Companion.newConfig import org.jitsi.metaconfig.config import java.time.Duration +import java.util.TreeMap +@SuppressFBWarnings(value = ["BX_UNBOXING_IMMEDIATELY_REBOXED"], justification = "False positive.") class ConferenceConfig { val conferenceStartTimeout: Duration by config { "org.jitsi.focus.IDLE_TIMEOUT".from(legacyConfig) @@ -72,6 +76,19 @@ class ConferenceConfig { } fun useRandomSharedDocumentName(): Boolean = useRandomSharedDocumentName + private val sourceSignalingDelays: TreeMap by config { + "jicofo.conference.source-signaling-delays".from(newConfig) + .convertFrom { cfg -> + TreeMap(cfg.entries.associate { it.key.toInt() to it.value.unwrapped() as Int }) + } + } + + /** + * Get the number of milliseconds to delay signaling of Jingle sources given a certain [conferenceSize]. + */ + fun getSourceSignalingDelayMs(conferenceSize: Int) = + sourceSignalingDelays.floorEntry(conferenceSize)?.value ?: 0 + /** * Whether to strip simulcast streams when signaling receivers. This option requires that jitsi-videobridge * uses the first SSRC in the SIM group as the target SSRC when rewriting streams, as this is the only SSRC diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 2193de55ac..83dfb4b3e7 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -191,6 +191,16 @@ jicofo { // If `true` the shared document uses a random name. Otherwise, it uses the conference name. use-random-name = false } + + // How much to delay signaling Jingle source-add and source-remove in order to batch them and reduce the number of + // messages (based on conference size). The values are in milliseconds. + source-signaling-delays = { + // Conferences with size <50 will have delay=0. + // Conferences with size between 50 and 99 will have delay=500 ms. + #50 = 500 + // Conferences with size >=100 have delay=1000 ms. + #100 = 1000 + } } // Configuration for the internal health checks performed by jicofo. diff --git a/src/test/kotlin/org/jitsi/jicofo/ConferenceConfigTest.kt b/src/test/kotlin/org/jitsi/jicofo/ConferenceConfigTest.kt new file mode 100644 index 0000000000..f5b5b580fe --- /dev/null +++ b/src/test/kotlin/org/jitsi/jicofo/ConferenceConfigTest.kt @@ -0,0 +1,56 @@ +/* + * Jicofo, the Jitsi Conference Focus. + * + * Copyright @ 2020 - present 8x8, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jitsi.jicofo + +import io.kotest.matchers.shouldBe + +class ConferenceConfigTest : ConfigTest() { + init { + context("source-signaling-delays") { + context("With no delay configured") { + val config = ConferenceConfig() + for (i in 0..100) { + config.getSourceSignalingDelayMs(i) shouldBe 0 + } + } + context("With delay configured") { + withNewConfig( + """ + jicofo.conference.source-signaling-delays { + // Intentionally out of order + 200 = 1000, + 100 = 500, + 300 = 2000 + } + """.trimIndent() + ) { + val config = ConferenceConfig() + config.getSourceSignalingDelayMs(0) shouldBe 0 + config.getSourceSignalingDelayMs(50) shouldBe 0 + config.getSourceSignalingDelayMs(99) shouldBe 0 + config.getSourceSignalingDelayMs(100) shouldBe 500 + config.getSourceSignalingDelayMs(199) shouldBe 500 + config.getSourceSignalingDelayMs(200) shouldBe 1000 + config.getSourceSignalingDelayMs(299) shouldBe 1000 + config.getSourceSignalingDelayMs(300) shouldBe 2000 + config.getSourceSignalingDelayMs(5000) shouldBe 2000 + } + } + } + } +}