diff --git a/documentation/developers/docstring/README.md b/documentation/developers/docstring/README.md index 940b23641..2b3f281c5 100644 --- a/documentation/developers/docstring/README.md +++ b/documentation/developers/docstring/README.md @@ -11,6 +11,9 @@ * [\_\_init\_\_](#__init__) * [run\_configure\_audio](#run_configure_audio) * [components](#components) +* [components.mqtt.utils](#components.mqtt.utils) +* [components.mqtt.mqtt\_command\_alias](#components.mqtt.mqtt_command_alias) +* [components.mqtt.mqtt\_const](#components.mqtt.mqtt_const) * [components.mqtt](#components.mqtt) * [MQTT](#components.mqtt.MQTT) * [run](#components.mqtt.MQTT.run) @@ -506,6 +509,24 @@ For more information see [Audio Configuration](../../builders/audio.md#audio-con # components + + +# components.mqtt.utils + + + +# components.mqtt.mqtt\_command\_alias + +This file provides definitions for MQTT to RPC command aliases + +See [] +See [RPC Commands](../../builders/rpc-commands.md) + + + + +# components.mqtt.mqtt\_const + # components.mqtt @@ -521,9 +542,6 @@ MQTT Plugin Package. class MQTT(threading.Thread) ``` -A thread for monitoring the events and publish intersting events via mqtt. - - #### run diff --git a/src/jukebox/components/mqtt/__init__.py b/src/jukebox/components/mqtt/__init__.py index 7d2ac8959..1be6b477a 100644 --- a/src/jukebox/components/mqtt/__init__.py +++ b/src/jukebox/components/mqtt/__init__.py @@ -2,22 +2,27 @@ import json import logging +import threading import time from typing import Any + +import paho.mqtt.client as paho_mqtt +from components.rpc_command_alias import cmd_alias_definitions + import jukebox.cfghandler import jukebox.plugs as plugs import jukebox.publishing import jukebox.publishing.server import jukebox.publishing.subscriber -import threading -import paho.mqtt.client as paho_mqtt +from .mqtt_command_alias import legacy_mqtt_cmd, mqtt_cmd +from .mqtt_const import Mqtt_Attributes, topics_to_send -logger = logging.getLogger('jb.mqtt') -cfg = jukebox.cfghandler.get_handler('jukebox') +logger = logging.getLogger("jb.mqtt") +cfg = jukebox.cfghandler.get_handler("jukebox") -base_topic = cfg.setndefault('mqtt', 'base_topic', value="phoniebox-dev") -topics_to_send = ["volume.level", "playerstatus"] +base_topic = cfg.setndefault("mqtt", "base_topic", value="phoniebox-dev") +legacy_support_enabled = cfg.setndefault("mqtt", "enable_legacy", value=True) REPEAT_MODE_OFF = "off" REPEAT_MODE_SINGLE = "single" @@ -25,101 +30,178 @@ def _get_current_time_milli(): - return int(round(time.time() * 1000)) + return int(round(time.time() * float(1000))) + + +def _split_topic(topic: str): + parts = topic.split("/") + return parts[2] if len(parts) == 3 else parts[1] + + +def _map_repeat_mode(repeat_active: bool, single_active: bool): + """Try to find the correct repeat mode.""" + if repeat_active is False: + return REPEAT_MODE_OFF + if single_active is True: + return REPEAT_MODE_SINGLE + return REPEAT_MODE_PLAYLIST + + +def _get_args(config: Any, payload: str): + if "args" not in config: + return None + elif hasattr(config["args"], "__call__"): + return config["args"](payload) + return config["args"] + + +def _get_rpc_command(config: Any): + if "rpc" not in config: + return None + elif isinstance(config["rpc"], str): + return cmd_alias_definitions[config["rpc"]] + return config["rpc"] + + +def _get_kwargs(config: Any, payload: dict[str, Any]): + if "kwargs" not in config: + return None + elif hasattr(config["kwargs"], "__call__"): + return config["kwargs"](payload) + return config["kwargs"] class MQTT(threading.Thread): + global mqtt_client """A thread for monitoring the events and publish intersting events via mqtt.""" _topic_name: str - _mqtt_client: paho_mqtt + _mqtt_client: paho_mqtt.Client _attributes = dict() + _available_cmds = mqtt_cmd def __init__(self, client: paho_mqtt.Client): - super().__init__(name='MqttClient') + super().__init__(name="MqttClient") self._mqtt_client = client - self._send_throttled("state", "online") + if legacy_support_enabled is True: + logger.info("Supporting legacy mqtt commands.") + self._available_cmds = {**mqtt_cmd, **legacy_mqtt_cmd} self.daemon = True self._keep_running = True self.listen_done = threading.Event() self.action_done = threading.Event() - def _publish_attr(self, topic: str, payload: Any): - """Publish the attribute via mqtt.""" - self._publish(f"attribute/{topic}", payload) + def _subscribe(self): + self._mqtt_client.message_callback_add("phoniebox-dev/cmd/#", self._on_cmd) + + def _on_cmd(self, client, userdata, msg): + cmd = _split_topic(topic=msg.topic) + payload = msg.payload.decode("utf-8") + logger.debug(f'Received MQTT cmd "{cmd}" {payload}') + try: + config = self._available_cmds[cmd] + rpc = _get_rpc_command(config) + args = _get_args(config, payload) + kwargs = _get_kwargs(config, payload) + + if rpc is None: + logger.warn(f'No rpc call configured for MQTT command "{cmd}"') + return - def _publish(self, topic: str, payload: Any): + plugs.call_ignore_errors( + package=rpc["package"], + plugin=rpc["plugin"], + method=rpc["method"] if "method" in rpc else None, + args=args, + kwargs=kwargs, + ) + except Exception as e: + logger.error(f"Ignoring failed call: ({cmd}) {e}") + + def _publish(self, topic: str, payload: Any, *, qos=0, retain=False): """Publish the message via mqtt.""" - self._mqtt_client.publish(f"{base_topic}/{topic}", json.dumps(payload)) - - def _send_throttled(self, topic, payload): + self._mqtt_client.publish( + topic=f"{base_topic}/{topic}", + payload=json.dumps(payload), + qos=qos, + retain=retain, + ) + + def _send_throttled( + self, topic, payload, *, min_time_skip=500, qos=0, retain=False + ): """Send mqtt message throttled unless value changed.""" now = _get_current_time_milli() if topic in self._attributes: prev = self._attributes[topic] - time_since_last_update = now - prev['last_update'] - if prev['value'] == payload and time_since_last_update < 30000: + time_since_last_update = now - prev["last_update"] + if prev["value"] == payload and time_since_last_update < 30000: return - if prev['value'] != payload and time_since_last_update < 1000: + if prev["value"] != payload and time_since_last_update < min_time_skip: return - self._attributes[topic] = { - 'value': payload, - 'last_update': now - } - self._publish_attr(topic, payload) + self._attributes[topic] = {"value": payload, "last_update": now} + self._publish(topic, payload, retain=retain, qos=qos) def _send_player_state(self, payload: Any): """Map the player state data.""" - self._send_throttled("state", payload["state"]) + self._send_throttled(Mqtt_Attributes.STATE.value, payload["state"]) if "title" in payload: - self._send_throttled("title", payload["title"]) + self._send_throttled(Mqtt_Attributes.TITLE.value, payload["title"]) if "artist" in payload: - self._send_throttled("artist", payload["artist"]) + self._send_throttled(Mqtt_Attributes.ARTIST.value, payload["artist"]) if "elapsed" in payload: - self._send_throttled("elapsed", payload["elapsed"]) + self._send_throttled( + Mqtt_Attributes.ELAPSED.value, + payload["elapsed"], + min_time_skip=2000, + ) if "duration" in payload: - self._send_throttled("duration", payload["duration"]) + self._send_throttled(Mqtt_Attributes.DURATION.value, payload["duration"]) if "track" in payload: - self._send_throttled("track", payload["song"]) + self._send_throttled(Mqtt_Attributes.TRACK.value, payload["song"]) if "file" in payload: - self._send_throttled("file", payload["file"]) - self._send_throttled("random", payload["random"] == "1") + self._send_throttled(Mqtt_Attributes.FILE.value, payload["file"]) + + self._send_throttled(Mqtt_Attributes.RANDOM.value, payload["random"] == "1") + repeat_active = bool(payload["repeat"] == "1") - self._send_throttled("repeat", repeat_active) - self._send_throttled("repeat_mode", self._map_repeat_mode(repeat_active, payload["single"] == "1")) + self._send_throttled(Mqtt_Attributes.REPEAT.value, repeat_active) + self._send_throttled( + Mqtt_Attributes.REPEAT_MODE.value, + _map_repeat_mode(repeat_active, payload["single"] == "1"), + ) def _send_volume(self, payload: Any): """Map the volume data.""" - self._send_throttled("volume", payload["volume"]) - self._send_throttled("mute", bool(payload["mute"])) - - def _map_repeat_mode(self, repeat_active: bool, single_active: bool): - """Try to find the correct repeat mode.""" - if repeat_active is False: - return REPEAT_MODE_OFF - if single_active is True: - return REPEAT_MODE_SINGLE - return REPEAT_MODE_PLAYLIST + if legacy_support_enabled: + self._send_throttled(Mqtt_Attributes.VOLUME.value, payload["volume"]) + self._send_throttled(Mqtt_Attributes.MUTE.value, bool(payload["mute"])) + self._send_throttled("status/player/volume", payload["volume"]) + self._send_throttled("status/player/mute", bool(payload["mute"])) def run(self) -> None: """The main loop of the MQTT thread.""" - logger.info('Start MQTT Thread') - sub = jukebox.publishing.subscriber.Subscriber("inproc://PublisherToProxy", topics_to_send) + logger.info("Start MQTT Thread") + self._send_throttled("state", "online", qos=1, retain=True) + self._subscribe() + + sub = jukebox.publishing.subscriber.Subscriber( + "inproc://PublisherToProxy", topics_to_send + ) while self._keep_running: [topic, payload] = sub.receive() - logger.debug(f"{topic}: {payload}") if topic == "volume.level": self._send_volume(payload) elif topic == "playerstatus": self._send_player_state(payload) - logger.info('Exit MQTT Thread') + logger.info("Exit MQTT Thread") def stop(self): """Stop the mqtt thread""" - logger.debug('Stop request received') - self._publish_attr("state", 'offline') + logger.info("Stopping MQTT Thread") + self._send_throttled("state", "offline", qos=1, retain=True) self._keep_running = False self.listen_done.clear() @@ -133,7 +215,7 @@ def stop(self): def on_connect(client, userdata, flags, rc): """Start thread on successful mqtt connection.""" - global status_thread, mqtt + global mqtt logger.debug(f"Connected with result code {rc} {base_topic}") mqtt = MQTT(client) @@ -146,23 +228,24 @@ def initialize(): global mqtt_client - client_id = cfg.setndefault('mqtt', 'base_topic', value="phoniebox-future3") - username = cfg.setndefault('mqtt', 'username', value="phoniebox-dev") - password = cfg.setndefault('mqtt', 'password', value="phoniebox-dev") - host = cfg.setndefault('mqtt', 'host', value="127.0.0.1") - port = cfg.setndefault('mqtt', 'port', value=1883) + client_id = cfg.setndefault("mqtt", "base_topic", value="phoniebox-future3") + username = cfg.setndefault("mqtt", "username", value="phoniebox-dev") + password = cfg.setndefault("mqtt", "password", value="phoniebox-dev") + host = cfg.setndefault("mqtt", "host", value="127.0.0.1") + port = cfg.setndefault("mqtt", "port", value=1883) mqtt_client = paho_mqtt.Client(client_id=client_id) - mqtt_client.username_pw_set( - username=username, password=password - ) + mqtt_client.username_pw_set(username=username, password=password) mqtt_client.on_connect = on_connect + mqtt_client.will_set( + topic=f"{base_topic}/state", payload=json.dumps("offline"), qos=1, retain=True + ) mqtt_client.connect(host, port, 60) mqtt_client.loop_start() @plugs.atexit -def atexit(**ignored_kwargs): +def atexit(signal_id: int, **ignored_kwargs): global mqtt, mqtt_client mqtt.stop() mqtt_client.loop_stop() diff --git a/src/jukebox/components/mqtt/mqtt_command_alias.py b/src/jukebox/components/mqtt/mqtt_command_alias.py new file mode 100644 index 000000000..1391e163c --- /dev/null +++ b/src/jukebox/components/mqtt/mqtt_command_alias.py @@ -0,0 +1,170 @@ +""" +This file provides definitions for MQTT to RPC command aliases + +See [] +See [RPC Commands](../../builders/rpc-commands.md) +""" + +import json +from distutils.util import strtobool + +from .mqtt_const import Mqtt_Commands +from .utils import parse_repeat_mode, play_folder_recursive_args + +legacy_mqtt_cmd = { + "volumeup": {"rpc": "change_volume", "args": 1}, + "volumedown": {"rpc": "change_volume", "args": -1}, + "mute": { + "rpc": { + "package": "volume", + "plugin": "ctrl", + "method": "mute", + }, + "args": strtobool, + }, + "playerplay": {"rpc": "play"}, + "playerpause": {"rpc": "pause"}, + "playernext": {"rpc": "next_song"}, + "playerprev": {"rpc": "prev_song"}, + "playerstop": { + "rpc": { + "package": "player", + "plugin": "ctrl", + "method": "stop", + } + }, + "playerrewind": { + "rpc": { + "package": "player", + "plugin": "ctrl", + "method": "rewind", + } + }, + "playershuffle": {"rpc": "shuffle"}, + "playerreplay": { + "rpc": { + "package": "player", + "plugin": "ctrl", + "method": "replay", + } + }, + "setvolume": {"rpc": "set_volume", "args": int}, + "setmaxvolume": {"rpc": "set_soft_max_volume", "args": int}, + "shutdownafter": {"rpc": "timer_shutdown", "args": int}, + "playerstopafter": {"rpc": "timer_stop_player", "args": int}, + "playerrepeat": {"rpc": "repeat", "args": parse_repeat_mode}, + "playfolder": {"rpc": "play_folder", "args": str}, + "playfolderrecursive": { + "rpc": "play_folder", + "kwargs": play_folder_recursive_args, # kwargs: folder, recursive + }, + # "scan": {}, + # "shutdownsilent": {}, + # "disablewifi": {}, + # "setidletime": {}, + # "playerseek": {}, + # "setvolstep": {}, + # "rfid": {}, + # "gpio": {}, + # "swipecard": {}, +} + + +_player_cmds = { + Mqtt_Commands.PLAY.value: {"rpc": "play"}, + Mqtt_Commands.PLAY_FOLDER.value: { + "rpc": "play_folder", + "kwargs": json.loads, # kwargs: folder, recursive + }, + Mqtt_Commands.PLAY_ALBUM.value: { + "rpc": "play_album", + "kwargs": json.loads, # kwargs: albumartist, album + }, + Mqtt_Commands.PLAY_CARD.value: { + "rpc": "play_card", + "kwargs": json.loads, # kwargs: folder, recursive + }, + Mqtt_Commands.PLAY_SINGLE.value: { + "rpc": "play_single", + "kwargs": json.loads, # kwargs: song_url + }, + Mqtt_Commands.PAUSE.value: {"rpc": "pause"}, + Mqtt_Commands.NEXT_SONG.value: {"rpc": "next_song"}, + Mqtt_Commands.PREV_SONG.value: {"rpc": "prev_song"}, + Mqtt_Commands.STOP.value: { + "rpc": { + "package": "player", + "plugin": "ctrl", + "method": "stop", + } + }, + Mqtt_Commands.REWIND.value: { + "rpc": { + "package": "player", + "plugin": "ctrl", + "method": "rewind", + } + }, + Mqtt_Commands.SHUFFLE.value: {"rpc": "shuffle"}, + Mqtt_Commands.REPLAY.value: { + "rpc": { + "package": "player", + "plugin": "ctrl", + "method": "replay", + } + }, + Mqtt_Commands.REPEAT.value: { + "rpc": "repeat", + "kwargs": json.loads, # kwargs: option + }, +} + +_volume_cmds = { + Mqtt_Commands.CHANGE_VOLUME.value: { + "rpc": "change_volume", + "kwargs": json.loads, # kwargs: step + }, + Mqtt_Commands.SET_VOLUME.value: { + "rpc": "set_volume", + "kwargs": json.loads, # kwargs: volume + }, + Mqtt_Commands.VOLUME_MUTE.value: { + "rpc": { + "package": "volume", + "plugin": "ctrl", + "method": "mute", + }, + "kwargs": json.loads, # kwargs: mute + }, + Mqtt_Commands.SET_SOFT_MAX_VOLUME.value: { + "rpc": "set_soft_max_volume", + "kwargs": json.loads, # kwargs: max_volume + }, +} + +_system_cmd = { + Mqtt_Commands.SAY_MY_IP.value: { + "rpc": "say_my_ip", + "kwargs": json.loads, # kwargs: option + }, + Mqtt_Commands.SHUTDOWN.value: {"rpc": "shutdown"}, + Mqtt_Commands.REBOOT.value: {"rpc": "reboot"}, + Mqtt_Commands.TIMER_SHUTDOWN.value: { + "rpc": "timer_shutdown", + "kwargs": json.loads, # kwargs: value + }, + Mqtt_Commands.TIMER_STOP_PLAYER.value: { + "rpc": "timer_stop_player", + "kwargs": json.loads, # kwargs: value + }, + Mqtt_Commands.TIMER_FADE_VOLUME.value: { + "rpc": "timer_fade_volume", + "kwargs": json.loads, # kwargs: value + }, +} + +mqtt_cmd = { + **_volume_cmds, + **_system_cmd, + **_player_cmds, +} diff --git a/src/jukebox/components/mqtt/mqtt_const.py b/src/jukebox/components/mqtt/mqtt_const.py new file mode 100644 index 000000000..9574b1b05 --- /dev/null +++ b/src/jukebox/components/mqtt/mqtt_const.py @@ -0,0 +1,45 @@ +from enum import Enum + + +class Mqtt_Attributes(Enum): + STATE = "attribute/state" + TITLE = "attribute/title" + ARTIST = "attribute/artist" + ELAPSED = "attribute/elapsed" + DURATION = "attribute/duration" + TRACK = "attribute/track" + FILE = "attribute/file" + RANDOM = "attribute/random" + REPEAT = "attribute/repeat" + REPEAT_MODE = "attribute/repeat_mode" + VOLUME = "attribute/volume" + MUTE = "attribute/mute" + + +topics_to_send = ["volume.level", "playerstatus"] + + +class Mqtt_Commands(Enum): + PLAY = "play" + PLAY_FOLDER = "play_folder" + PLAY_ALBUM = "play_album" + PLAY_CARD = "play_card" + PLAY_SINGLE = "play_single" + PAUSE = "pause" + NEXT_SONG = "next_song" + PREV_SONG = "prev_song" + STOP = "stop" + REWIND = "rewind" + SHUFFLE = "shuffle" + REPLAY = "replay" + REPEAT = "repeat" + CHANGE_VOLUME = "change_volume" + SET_VOLUME = "set_volume" + VOLUME_MUTE = "volume_mute" + SET_SOFT_MAX_VOLUME = "set_soft_max_volume" + SAY_MY_IP = "say_my_ip" + SHUTDOWN = "shutdown" + REBOOT = "reboot" + TIMER_SHUTDOWN = "timer_shutdown" + TIMER_STOP_PLAYER = "timer_stop_player" + TIMER_FADE_VOLUME = "timer_fade_volume" diff --git a/src/jukebox/components/mqtt/utils.py b/src/jukebox/components/mqtt/utils.py new file mode 100644 index 000000000..f391e81ca --- /dev/null +++ b/src/jukebox/components/mqtt/utils.py @@ -0,0 +1,12 @@ +def play_folder_recursive_args(payload: str): + return {"folder": payload, "recursive": True} + + +def parse_repeat_mode(payload: str): + if payload == "single": + return "toggle_repeat_single" + elif payload == "playlist": + return "toggle_repeat" + elif payload == "disable" or payload == "off": + return None + return "toggle"