Skip to content

Commit

Permalink
feat: Allow to use the Service XMPP connection for jibri. (#732)
Browse files Browse the repository at this point in the history
* ref: Move Jibri IQ handling to XmppServices.

* ref: Add IqProcessingResult.

* ref: Route JibriIqs through the conference, remove extra listeners.

* ref: Send IQ requests to Jibri through JibriDetector's connection.

* feat: Support jibri on the XMPP service connection.
  • Loading branch information
bgrozev authored Apr 28, 2021
1 parent b02dc65 commit bdfc770
Show file tree
Hide file tree
Showing 22 changed files with 343 additions and 129 deletions.
17 changes: 0 additions & 17 deletions src/main/java/org/jitsi/impl/protocol/xmpp/XmppProviderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.jitsi.impl.protocol.xmpp.log.*;
import org.jitsi.jicofo.*;
import org.jitsi.jicofo.discovery.*;
import org.jitsi.jicofo.jibri.*;
import org.jitsi.jicofo.xmpp.*;
import org.jitsi.protocol.xmpp.*;
import org.jitsi.retry.*;
Expand Down Expand Up @@ -61,7 +60,6 @@ public class XmppProviderImpl
* Jingle operation set.
*/
private final OperationSetJingleImpl jingleOpSet;
private final JibriIqHandler jibriIqHandler;

private final Muc muc = new Muc();

Expand Down Expand Up @@ -107,8 +105,6 @@ public XmppProviderImpl(

connection = createXmppConnection();
connectRetry = new RetryStrategy(TaskPools.getScheduledPool());
jibriIqHandler = new JibriIqHandler();
connection.registerIQRequestHandler(jibriIqHandler);
}


Expand Down Expand Up @@ -261,7 +257,6 @@ public void stop()
logger.info("Disconnected.");

connection.unregisterIQRequestHandler(jingleOpSet);
connection.unregisterIQRequestHandler(jibriIqHandler);
connection.removeConnectionListener(connListener);
}

Expand Down Expand Up @@ -339,18 +334,6 @@ public List<String> discoverFeatures(@NotNull EntityFullJid jid)
return DiscoveryUtil.discoverParticipantFeatures(this, jid);
}

@Override
public void addJibriIqHandler(@NotNull JibriSessionIqHandler jibriIqHandler)
{
this.jibriIqHandler.addJibri(jibriIqHandler);
}

@Override
public void removeJibriIqHandler(@NotNull JibriSessionIqHandler jibriIqHandler)
{
this.jibriIqHandler.removeJibri(jibriIqHandler);
}

class XmppConnectionListener
implements ConnectionListener
{
Expand Down
18 changes: 13 additions & 5 deletions src/main/java/org/jitsi/jicofo/FocusManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* @author Boris Grozev
*/
public class FocusManager
implements JitsiMeetConferenceImpl.ConferenceListener
implements JitsiMeetConferenceImpl.ConferenceListener, ConferenceStore
{
/**
* The logger used by this instance.
Expand Down Expand Up @@ -71,6 +71,8 @@ public class FocusManager
*/
private final Map<EntityBareJid, JitsiMeetConferenceImpl> conferences = new ConcurrentHashMap<>();

private final List<JitsiMeetConference> conferencesCache = new CopyOnWriteArrayList<>();

/**
* The set of the IDs of conferences in {@link #conferences}.
*/
Expand Down Expand Up @@ -289,6 +291,7 @@ private JitsiMeetConferenceImpl createConference(
id, includeInStatistics);

conferences.put(room, conference);
conferencesCache.add(conference);
conferenceGids.add(id);
}

Expand Down Expand Up @@ -357,6 +360,7 @@ public void conferenceEnded(JitsiMeetConferenceImpl conference)
synchronized (conferencesSyncRoot)
{
conferences.remove(roomName);
conferencesCache.remove(conference);
conferenceGids.remove(conference.getId());

// It is not clear whether the code below necessarily needs to
Expand Down Expand Up @@ -408,6 +412,7 @@ public void bridgeRemoved()
* @return the {@code JitsiMeetConference} for the specified
* {@code roomName} or {@code null} if no conference has been allocated yet
*/
@Override
public JitsiMeetConferenceImpl getConference(EntityBareJid roomName)
{
synchronized (conferencesSyncRoot)
Expand All @@ -416,6 +421,12 @@ public JitsiMeetConferenceImpl getConference(EntityBareJid roomName)
}
}

@Override
public List<JitsiMeetConference> getAllConferences()
{
return getConferences();
}

/**
* Get the conferences of this Jicofo. Note that the
* List returned is a snapshot of the conference
Expand All @@ -424,10 +435,7 @@ public JitsiMeetConferenceImpl getConference(EntityBareJid roomName)
*/
public List<JitsiMeetConference> getConferences()
{
synchronized (conferencesSyncRoot)
{
return new ArrayList<>(conferences.values());
}
return conferencesCache;
}

private int getNonHealthCheckConferenceCount()
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/org/jitsi/jicofo/JitsiMeetConference.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.jitsi.impl.protocol.xmpp.*;
import org.jitsi.jicofo.bridge.*;
import org.jitsi.jicofo.jibri.*;
import org.jitsi.jicofo.xmpp.*;
import org.jitsi.jicofo.xmpp.muc.*;
import org.jitsi.xmpp.extensions.jibri.*;
import org.jxmpp.jid.*;

import java.util.*;
Expand Down Expand Up @@ -97,4 +99,5 @@ default JibriSipGateway getJibriSipGateway()
* Whether this conference should be considered when generating statistics.
*/
boolean includeInStatistics();
IqProcessingResult handleJibriRequest(IqRequest<JibriIq> request);
}
24 changes: 22 additions & 2 deletions src/main/java/org/jitsi/jicofo/JitsiMeetConferenceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.jitsi.impl.protocol.xmpp.*;
import org.jitsi.jicofo.bridge.*;
import org.jitsi.jicofo.version.*;
import org.jitsi.jicofo.xmpp.*;
import org.jitsi.jicofo.xmpp.muc.*;
import org.jitsi.utils.*;
import org.jitsi.utils.logging2.*;
import org.jitsi.utils.logging2.Logger;
import org.jitsi.xmpp.extensions.colibri.*;
import org.jitsi.xmpp.extensions.jibri.*;
import org.jitsi.xmpp.extensions.jingle.*;

import org.jitsi.impl.protocol.xmpp.colibri.*;
Expand All @@ -47,6 +49,8 @@
import java.util.logging.*;
import java.util.stream.*;

import static org.jitsi.jicofo.xmpp.IqProcessingResult.*;

/**
* Represents a Jitsi Meet conference. Manages the Jingle sessions with the
* participants, as well as the COLIBRI session with the jitsi-videobridge
Expand Down Expand Up @@ -320,7 +324,6 @@ public void start()
jibriRecorder
= new JibriRecorder(
this,
clientXmppProvider,
jibriDetector,
logger);
}
Expand All @@ -331,7 +334,6 @@ public void start()
jibriSipGateway
= new JibriSipGateway(
this,
clientXmppProvider,
sipJibriDetector,
logger);
}
Expand Down Expand Up @@ -2458,6 +2460,24 @@ public boolean includeInStatistics()
return includeInStatistics;
}

@Override
public IqProcessingResult handleJibriRequest(IqRequest<JibriIq> request)
{
IqProcessingResult result = new NotProcessed();
if (started.get())
{
if (jibriRecorder != null)
{
result = jibriRecorder.handleJibriRequest(request);
}
if (!(result instanceof NotProcessed) && jibriSipGateway != null)
{
result = jibriRecorder.handleJibriRequest(request);
}
}
return result;
}

private FocusManager getFocusManager()
{
return jicofoServices.getFocusManager();
Expand Down
17 changes: 4 additions & 13 deletions src/main/java/org/jitsi/jicofo/jibri/JibriSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,6 @@ static private boolean isStartingStatus(JibriIq.Status status)
*/
private final String applicationData;

/**
* {@link AbstractXMPPConnection} instance used to send/listen for XMPP packets.
*/
private final AbstractXMPPConnection xmpp;

/**
* The maximum amount of retries we'll attempt
*/
Expand Down Expand Up @@ -176,8 +171,6 @@ static private boolean isStartingStatus(JibriIq.Status status)
* @param roomName the name if the XMPP MUC room (full address).
* @param pendingTimeout how many seconds this session can wait in pending
* state, before trying another Jibri instance or failing with an error.
* @param connection the XMPP connection which will be used to send/listen
* for packets.
* @param jibriDetector the Jibri detector which will be used to select
* Jibri instance.
* @param isSIP <tt>true</tt> if it's a SIP session or <tt>false</tt> for
Expand All @@ -198,7 +191,6 @@ static private boolean isStartingStatus(JibriIq.Status status)
Jid initiator,
long pendingTimeout,
int maxNumRetries,
AbstractXMPPConnection connection,
JibriDetector jibriDetector,
boolean isSIP,
String sipAddress,
Expand All @@ -222,7 +214,6 @@ static private boolean isStartingStatus(JibriIq.Status status)
this.youTubeBroadcastId = youTubeBroadcastId;
this.sessionId = sessionId;
this.applicationData = applicationData;
this.xmpp = connection;
jibriDetector.addHandler(jibriEventHandler);
logger = new LoggerImpl(getClass().getName(), logLevelDelegate.getLevel());
}
Expand Down Expand Up @@ -370,7 +361,7 @@ synchronized public void stop(Jid initiator)
// in the processing of the response.
try
{
xmpp.sendIqWithResponseCallback(
jibriDetector.getXmppConnection().sendIqWithResponseCallback(
stopRequest,
stanza -> {
if (stanza instanceof JibriIq) {
Expand All @@ -394,7 +385,7 @@ synchronized public void stop(Jid initiator)
60000);
} catch (SmackException.NotConnectedException | InterruptedException e)
{
logger.error("Error sending stop iq: " + e.toString());
logger.error("Error sending stop iq: " + e, e);
}
}

Expand Down Expand Up @@ -451,7 +442,7 @@ private void processJibriIqFromJibri(JibriIq iq)
}
else
{
logger.error("Received UNDEFINED status from jibri: " + iq.toString());
logger.error("Received UNDEFINED status from jibri: " + iq);
}
}

Expand Down Expand Up @@ -523,7 +514,7 @@ private void sendJibriStartIq(final Jid jibriJid)
// timeout each time.
reschedulePendingTimeout();

IQ reply = UtilKt.sendIqAndGetResponse(xmpp, startIq);
IQ reply = UtilKt.sendIqAndGetResponse(jibriDetector.getXmppConnection(), startIq);

if (!(reply instanceof JibriIq))
{
Expand Down
3 changes: 0 additions & 3 deletions src/main/kotlin/org/jitsi/impl/protocol/xmpp/XmppProvider.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.jitsi.impl.protocol.xmpp

import org.jitsi.jicofo.jibri.JibriSessionIqHandler
import org.jitsi.jicofo.xmpp.XmppConnectionConfig
import org.jitsi.protocol.xmpp.OperationSetJingle
import org.jivesoftware.smack.AbstractXMPPConnection
Expand Down Expand Up @@ -60,8 +59,6 @@ interface XmppProvider {
fun findOrCreateRoom(name: EntityBareJid): ChatRoom

fun discoverFeatures(jid: EntityFullJid): List<String>
fun addJibriIqHandler(jibriIqHandler: JibriSessionIqHandler)
fun removeJibriIqHandler(jibriIqHandler: JibriSessionIqHandler)
fun getStats(): JSONObject

class RoomExistsException(message: String) : Exception(message)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* Jicofo, the Jitsi Conference Focus.
*
* Copyright @ 2021-Present 8x8, Inc.
* Copyright @ 2021 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,14 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.jicofo.jibri
package org.jitsi.jicofo

import org.jitsi.xmpp.extensions.jibri.JibriIq
import org.jxmpp.jid.EntityBareJid

/**
* Accepts and handles Jibri IQs for a specific set of Jibri sessions.
*/
interface JibriSessionIqHandler {
fun accept(iq: JibriIq): Boolean
fun handleIQRequest(iq: JibriIq)
interface ConferenceStore {
/** Get a list of all conferences. */
fun getAllConferences(): List<JitsiMeetConference>
/** Get a conference for a specific [Jid] (i.e. name). */
fun getConference(jid: EntityBareJid): JitsiMeetConference?
}

class EmptyConferenceStore : ConferenceStore {
override fun getAllConferences() = emptyList<JitsiMeetConference>()
override fun getConference(jid: EntityBareJid): JitsiMeetConference? = null
}
10 changes: 9 additions & 1 deletion src/main/kotlin/org/jitsi/jicofo/JicofoServices.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.jitsi.jicofo.rest.Application
import org.jitsi.jicofo.version.CurrentVersionImpl
import org.jitsi.jicofo.xmpp.IqHandler
import org.jitsi.jicofo.xmpp.XmppConnectionConfig
import org.jitsi.jicofo.xmpp.XmppConnectionEnum
import org.jitsi.jicofo.xmpp.XmppProviderFactory
import org.jitsi.jicofo.xmpp.XmppServices
import org.jitsi.jicofo.xmpp.initializeSmack
Expand Down Expand Up @@ -82,6 +83,11 @@ open class JicofoServices {

val xmppServices = XmppServices(xmppProviderFactory)

private fun getXmppConnectionByName(name: XmppConnectionEnum) = when (name) {
XmppConnectionEnum.Client -> xmppServices.clientConnection
XmppConnectionEnum.Service -> xmppServices.serviceConnection
}

val bridgeSelector = BridgeSelector()
private val bridgeDetector: BridgeMucDetector? = BridgeConfig.config.breweryJid?.let { breweryJid ->
BridgeMucDetector(xmppServices.serviceConnection, bridgeSelector, breweryJid).apply { init() }
Expand All @@ -90,7 +96,9 @@ open class JicofoServices {
null
}
val jibriDetector = JibriConfig.config.breweryJid?.let { breweryJid ->
JibriDetector(xmppServices.clientConnection, breweryJid, false).apply { init() }
JibriDetector(getXmppConnectionByName(JibriConfig.config.xmppConnectionName), breweryJid, false).apply {
init()
}
} ?: run {
logger.info("No Jibri detector configured.")
null
Expand Down
Loading

0 comments on commit bdfc770

Please sign in to comment.