Skip to content

Commit

Permalink
move start_pipeline to asyncio executor, handle bus_call messages
Browse files Browse the repository at this point in the history
  • Loading branch information
danisla committed Sep 8, 2023
1 parent cb769fe commit 162a901
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions src/selkies_gstreamer/gstwebrtc_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1061,23 +1061,23 @@ def __send_ice(self, webrtcbin, mlineindex, candidate):
loop = asyncio.new_event_loop()
loop.run_until_complete(self.on_ice(mlineindex, candidate))

def bus_call(self, bus, message, loop):
def bus_call(self, message):
t = message.type
if t == Gst.MessageType.EOS:
logger.error("End-of-stream\n")
loop.quit()
return False
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
logger.error("Error: %s: %s\n" % (err, debug))
loop.quit()
return False
elif t == Gst.MessageType.STATE_CHANGED:
if isinstance(message.src, Gst.Pipeline):
old_state, new_state, pending_state = message.parse_state_changed()
logger.info(("Pipeline state changed from %s to %s." %
(old_state.value_nick, new_state.value_nick)))
if (old_state.value_nick == "paused" and new_state.value_nick == "ready"):
logger.info("stopping bus message loop")
loop.quit()
return False
elif t == Gst.MessageType.LATENCY:
if self.pipeline:
try:
Expand All @@ -1099,11 +1099,6 @@ def start_pipeline(self):
self.build_webrtcbin_pipeline()
self.build_video_pipeline()

loop = GLib.MainLoop()
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect("message", self.bus_call, loop)

if self.audio:
self.build_audio_pipeline()

Expand All @@ -1130,7 +1125,16 @@ def start_pipeline(self):
transceiver.set_property("do-nack", True)

logger.info("pipeline started")
loop.run()

# Start bus call loop
self.bus = self.pipeline.get_bus()
while self.bus is not None and self.pipeline is not None:
while self.bus.have_pending():
msg = self.bus.pop()
if not self.bus_call(msg):
self.bus = None
break
time.sleep(0.1)

def stop_pipeline(self):
logger.info("stopping pipeline")
Expand Down

0 comments on commit 162a901

Please sign in to comment.