Skip to content

Commit

Permalink
add bus message handler to pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
danisla committed Sep 8, 2023
1 parent 3889685 commit cb769fe
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/selkies_gstreamer/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ def on_session_handler(meta=None):
on_resize_handler(meta["res"])
if meta["scale"]:
on_scaling_ratio_handler(meta["scale"])
app.start_pipeline()
loop.run_in_executor(None, lambda: app.start_pipeline())
signalling.on_session = on_session_handler

# Initialize the Xinput instance
Expand Down
37 changes: 35 additions & 2 deletions src/selkies_gstreamer/gstwebrtc_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import json
import logging
import re
import time

import gi
gi.require_version("Gst", "1.0")
gi.require_version('GstWebRTC', '1.0')
gi.require_version('GstSdp', '1.0')
gi.require_version('GLib', '2.0')
from gi.repository import GLib
from gi.repository import Gst
from gi.repository import GstWebRTC
from gi.repository import GstSdp
Expand Down Expand Up @@ -1058,6 +1061,32 @@ def __send_ice(self, webrtcbin, mlineindex, candidate):
loop = asyncio.new_event_loop()
loop.run_until_complete(self.on_ice(mlineindex, candidate))

def bus_call(self, bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
logger.error("End-of-stream\n")
loop.quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
logger.error("Error: %s: %s\n" % (err, debug))
loop.quit()
elif t == Gst.MessageType.STATE_CHANGED:
if isinstance(message.src, Gst.Pipeline):
old_state, new_state, pending_state = message.parse_state_changed()
logger.info(("Pipeline state changed from %s to %s." %
(old_state.value_nick, new_state.value_nick)))
if (old_state.value_nick == "paused" and new_state.value_nick == "ready"):
logger.info("stopping bus message loop")
loop.quit()
elif t == Gst.MessageType.LATENCY:
if self.pipeline:
try:
self.pipeline.recalculate_latency()
except Exception as e:
logger.warning("failed to recalculate warning, exception: %s" % str(e))

return True

def start_pipeline(self):
"""Starts the GStreamer pipeline
"""
Expand All @@ -1070,6 +1099,11 @@ def start_pipeline(self):
self.build_webrtcbin_pipeline()
self.build_video_pipeline()

loop = GLib.MainLoop()
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect("message", self.bus_call, loop)

if self.audio:
self.build_audio_pipeline()

Expand All @@ -1096,6 +1130,7 @@ def start_pipeline(self):
transceiver.set_property("do-nack", True)

logger.info("pipeline started")
loop.run()

def stop_pipeline(self):
logger.info("stopping pipeline")
Expand All @@ -1106,12 +1141,10 @@ def stop_pipeline(self):
if self.pipeline:
logger.info("setting pipeline state to NULL")
self.pipeline.set_state(Gst.State.NULL)
self.pipeline.unparent()
self.pipeline = None
logger.info("pipeline set to state NULL")
if self.webrtcbin:
self.webrtcbin.set_state(Gst.State.NULL)
self.webrtcbin.unparent()
self.webrtcbin = None
logger.info("webrtcbin set to state NULL")
logger.info("pipeline stopped")

0 comments on commit cb769fe

Please sign in to comment.