diff --git a/src/selkies_gstreamer/__main__.py b/src/selkies_gstreamer/__main__.py index 1c860f65..bb9acd4a 100644 --- a/src/selkies_gstreamer/__main__.py +++ b/src/selkies_gstreamer/__main__.py @@ -569,7 +569,7 @@ def on_session_handler(meta=None): on_resize_handler(meta["res"]) if meta["scale"]: on_scaling_ratio_handler(meta["scale"]) - app.start_pipeline() + loop.run_in_executor(None, lambda: app.start_pipeline()) signalling.on_session = on_session_handler # Initialize the Xinput instance diff --git a/src/selkies_gstreamer/gstwebrtc_app.py b/src/selkies_gstreamer/gstwebrtc_app.py index 33178535..a68e7536 100644 --- a/src/selkies_gstreamer/gstwebrtc_app.py +++ b/src/selkies_gstreamer/gstwebrtc_app.py @@ -24,11 +24,14 @@ import json import logging import re +import time import gi gi.require_version("Gst", "1.0") gi.require_version('GstWebRTC', '1.0') gi.require_version('GstSdp', '1.0') +gi.require_version('GLib', '2.0') +from gi.repository import GLib from gi.repository import Gst from gi.repository import GstWebRTC from gi.repository import GstSdp @@ -1058,6 +1061,32 @@ def __send_ice(self, webrtcbin, mlineindex, candidate): loop = asyncio.new_event_loop() loop.run_until_complete(self.on_ice(mlineindex, candidate)) + def bus_call(self, bus, message, loop): + t = message.type + if t == Gst.MessageType.EOS: + logger.error("End-of-stream\n") + loop.quit() + elif t == Gst.MessageType.ERROR: + err, debug = message.parse_error() + logger.error("Error: %s: %s\n" % (err, debug)) + loop.quit() + elif t == Gst.MessageType.STATE_CHANGED: + if isinstance(message.src, Gst.Pipeline): + old_state, new_state, pending_state = message.parse_state_changed() + logger.info(("Pipeline state changed from %s to %s." % + (old_state.value_nick, new_state.value_nick))) + if (old_state.value_nick == "paused" and new_state.value_nick == "ready"): + logger.info("stopping bus message loop") + loop.quit() + elif t == Gst.MessageType.LATENCY: + if self.pipeline: + try: + self.pipeline.recalculate_latency() + except Exception as e: + logger.warning("failed to recalculate warning, exception: %s" % str(e)) + + return True + def start_pipeline(self): """Starts the GStreamer pipeline """ @@ -1070,6 +1099,11 @@ def start_pipeline(self): self.build_webrtcbin_pipeline() self.build_video_pipeline() + loop = GLib.MainLoop() + self.bus = self.pipeline.get_bus() + self.bus.add_signal_watch() + self.bus.connect("message", self.bus_call, loop) + if self.audio: self.build_audio_pipeline() @@ -1096,6 +1130,7 @@ def start_pipeline(self): transceiver.set_property("do-nack", True) logger.info("pipeline started") + loop.run() def stop_pipeline(self): logger.info("stopping pipeline") @@ -1106,12 +1141,10 @@ def stop_pipeline(self): if self.pipeline: logger.info("setting pipeline state to NULL") self.pipeline.set_state(Gst.State.NULL) - self.pipeline.unparent() self.pipeline = None logger.info("pipeline set to state NULL") if self.webrtcbin: self.webrtcbin.set_state(Gst.State.NULL) - self.webrtcbin.unparent() self.webrtcbin = None logger.info("webrtcbin set to state NULL") logger.info("pipeline stopped")