From 05074f7d6346bc98e2902778c66ad22b8ff4733e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roland=20L=C3=B6tscher?= Date: Wed, 7 Feb 2024 17:52:02 +0100 Subject: [PATCH] Port 'WebSocket Client' demo to Python --- demos/WebSocket Client/main.py | 87 ++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 demos/WebSocket Client/main.py diff --git a/demos/WebSocket Client/main.py b/demos/WebSocket Client/main.py new file mode 100644 index 00000000..e2ebc9e1 --- /dev/null +++ b/demos/WebSocket Client/main.py @@ -0,0 +1,87 @@ +import gi + +gi.require_version("Soup", "3.0") +from gi.repository import GLib, Gio, Soup +import sys +import workbench + +builder = workbench.builder + +button_connect = builder.get_object("button_connect") +button_disconnect = builder.get_object("button_disconnect") +button_send = builder.get_object("button_send") +entry_url = builder.get_object("entry_url") +entry_message = builder.get_object("entry_message") + +connection = None + + +def on_open(): + print("open") + button_connect.set_sensitive(False) + button_disconnect.set_sensitive(True) + button_send.set_sensitive(True) + + +def on_closed(_self): + print("closed") + global connection + connection = None + button_connect.set_sensitive(True) + button_disconnect.set_sensitive(False) + button_send.set_sensitive(False) + + +def on_error(_self, err): + print("error") + print(err, file=sys.stderr) + + +def on_message(_self, data_type, message): + if data_type != Soup.WebsocketDataType.TEXT: + return + str = message.unref_to_array().decode("utf-8") + print("received:", str) + + +def send(message): + connection.send_message( + Soup.WebsocketDataType.TEXT, + GLib.Bytes(message.encode("utf-8")), + ) + print("sent:", message) + + +def on_button_send_clicked(_button): + global message + message = entry_message.get_text() + send(message) + + +def on_connected(session, result): + global connection + connection = session.websocket_connect_finish(result) + connection.connect("closed", on_closed) + connection.connect("error", on_error) + connection.connect("message", on_message) + + on_open() + + +def connect(_button): + session = Soup.Session() + message = Soup.Message( + method="GET", + uri=GLib.Uri.parse(entry_url.get_text(), GLib.UriFlags.NONE), + ) + + session.websocket_connect_async( + message, None, [], GLib.PRIORITY_DEFAULT, None, on_connected + ) + + +button_connect.connect("clicked", connect) +button_disconnect.connect( + "clicked", lambda _: connection.close(Soup.WebsocketCloseCode.NORMAL, None) +) +button_send.connect("clicked", on_button_send_clicked)