diff --git a/src/selkies_gstreamer/gstwebrtc_app.py b/src/selkies_gstreamer/gstwebrtc_app.py index a68e7536..f80a6b2b 100644 --- a/src/selkies_gstreamer/gstwebrtc_app.py +++ b/src/selkies_gstreamer/gstwebrtc_app.py @@ -1061,15 +1061,15 @@ def __send_ice(self, webrtcbin, mlineindex, candidate): loop = asyncio.new_event_loop() loop.run_until_complete(self.on_ice(mlineindex, candidate)) - def bus_call(self, bus, message, loop): + def bus_call(self, message): t = message.type if t == Gst.MessageType.EOS: logger.error("End-of-stream\n") - loop.quit() + return False elif t == Gst.MessageType.ERROR: err, debug = message.parse_error() logger.error("Error: %s: %s\n" % (err, debug)) - loop.quit() + return False elif t == Gst.MessageType.STATE_CHANGED: if isinstance(message.src, Gst.Pipeline): old_state, new_state, pending_state = message.parse_state_changed() @@ -1077,7 +1077,7 @@ def bus_call(self, bus, message, loop): (old_state.value_nick, new_state.value_nick))) if (old_state.value_nick == "paused" and new_state.value_nick == "ready"): logger.info("stopping bus message loop") - loop.quit() + return False elif t == Gst.MessageType.LATENCY: if self.pipeline: try: @@ -1099,11 +1099,6 @@ def start_pipeline(self): self.build_webrtcbin_pipeline() self.build_video_pipeline() - loop = GLib.MainLoop() - self.bus = self.pipeline.get_bus() - self.bus.add_signal_watch() - self.bus.connect("message", self.bus_call, loop) - if self.audio: self.build_audio_pipeline() @@ -1130,7 +1125,16 @@ def start_pipeline(self): transceiver.set_property("do-nack", True) logger.info("pipeline started") - loop.run() + + # Start bus call loop + self.bus = self.pipeline.get_bus() + while self.bus is not None and self.pipeline is not None: + while self.bus.have_pending(): + msg = self.bus.pop() + if not self.bus_call(msg): + self.bus = None + break + time.sleep(0.1) def stop_pipeline(self): logger.info("stopping pipeline")