Skip to content

Commit

Permalink
refactor: tidy up the code and improve logs
Browse files Browse the repository at this point in the history
  • Loading branch information
c0un7-z3r0 committed Jul 2, 2024
1 parent c5c5cd3 commit c4aa79c
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 125 deletions.
202 changes: 91 additions & 111 deletions src/jukebox/components/mqtt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
"""MQTT Plugin Package."""

import json
import logging
import threading
import time
from typing import Any

import paho.mqtt.client as paho_mqtt
from components.rpc_command_alias import cmd_alias_definitions

import jukebox.cfghandler
import jukebox.plugs as plugs
Expand All @@ -17,73 +13,35 @@

from .mqtt_command_alias import legacy_mqtt_cmd, mqtt_cmd
from .mqtt_const import Mqtt_Attributes, topics_to_send
from .utils import (
get_args,
get_current_time_milli,
get_kwargs,
get_rpc_command,
map_repeat_mode,
split_topic,
)

logger = logging.getLogger("jb.mqtt")
cfg = jukebox.cfghandler.get_handler("jukebox")

base_topic = cfg.setndefault("mqtt", "base_topic", value="phoniebox-dev")
legacy_support_enabled = cfg.setndefault("mqtt", "enable_legacy", value=True)

REPEAT_MODE_OFF = "off"
REPEAT_MODE_SINGLE = "single"
REPEAT_MODE_PLAYLIST = "playlist"


def _get_current_time_milli():
return int(round(time.time() * float(1000)))


def _split_topic(topic: str):
parts = topic.split("/")
return parts[2] if len(parts) == 3 else parts[1]


def _map_repeat_mode(repeat_active: bool, single_active: bool):
"""Try to find the correct repeat mode."""
if repeat_active is False:
return REPEAT_MODE_OFF
if single_active is True:
return REPEAT_MODE_SINGLE
return REPEAT_MODE_PLAYLIST


def _get_args(config: Any, payload: str):
if "args" not in config:
return None
elif hasattr(config["args"], "__call__"):
return config["args"](payload)
return config["args"]


def _get_rpc_command(config: Any):
if "rpc" not in config:
return None
elif isinstance(config["rpc"], str):
return cmd_alias_definitions[config["rpc"]]
return config["rpc"]


def _get_kwargs(config: Any, payload: dict[str, Any]):
if "kwargs" not in config:
return None
elif hasattr(config["kwargs"], "__call__"):
return config["kwargs"](payload)
return config["kwargs"]


class MQTT(threading.Thread):
global mqtt_client
"""A thread for monitoring the events and publish intersting events via mqtt."""
"""A thread for monitoring events and publishing interesting events via MQTT."""

_topic_name: str
_mqtt_client: paho_mqtt.Client
_attributes = dict()
_attributes: dict = {}
_available_cmds = mqtt_cmd

def __init__(self, client: paho_mqtt.Client):
super().__init__(name="MqttClient")
self._mqtt_client = client
if legacy_support_enabled is True:
logger.info("Supporting legacy mqtt commands.")
if legacy_support_enabled:
logger.info("Supporting legacy MQTT commands.")
self._available_cmds = {**mqtt_cmd, **legacy_mqtt_cmd}

self.daemon = True
Expand All @@ -92,34 +50,61 @@ def __init__(self, client: paho_mqtt.Client):
self.action_done = threading.Event()

def _subscribe(self):
logger.debug("Subscribing to MQTT topics.")
self._mqtt_client.message_callback_add("phoniebox-dev/cmd/#", self._on_cmd)

def _on_cmd(self, client, userdata, msg):
cmd = _split_topic(topic=msg.topic)
cmd = split_topic(topic=msg.topic)
payload = msg.payload.decode("utf-8")
logger.debug(f'Received MQTT cmd "{cmd}" {payload}')
logger.debug(f'Received MQTT command "{cmd}" with payload "{payload}"')
try:
config = self._available_cmds[cmd]
rpc = _get_rpc_command(config)
args = _get_args(config, payload)
kwargs = _get_kwargs(config, payload)
config = self._available_cmds.get(cmd)
if not config:
logger.warning(f'No configuration found for MQTT command "{cmd}"')
return

rpc = get_rpc_command(config)
args = get_args(config, payload)
kwargs = get_kwargs(config, payload)

if rpc is None:
logger.warn(f'No rpc call configured for MQTT command "{cmd}"')
logger.warning(f'No RPC call configured for MQTT command "{cmd}"')
return

plugs.call_ignore_errors(
package=rpc["package"],
plugin=rpc["plugin"],
method=rpc["method"] if "method" in rpc else None,
args=args,
kwargs=kwargs,
)
package = rpc.get("package")
plugin = rpc.get("plugin")
method = rpc.get("method")

if package is None:
raise ValueError(
f'Missing "package" attribute for MQTT command "{cmd}"'
)
elif plugin is None:
raise ValueError(f'Missing "plugin" attribute for MQTT command "{cmd}"')
elif method is None:
raise ValueError(f'Missing "method" attribute for MQTT command "{cmd}"')
else:
logger.info(
f'Executing MQTT command "{cmd}" with package="{package}",'
+ f'plugin="{plugin}", method="{method}", args={args}, kwargs={kwargs}'
)
plugs.call_ignore_errors(
package=package,
plugin=plugin,
method=method,
args=args,
kwargs=kwargs,
)
except Exception as e:
logger.error(f"Ignoring failed call: ({cmd}) {e}")
logger.error(
f"Ignoring failed call for MQTT command '{cmd}': {e}", exc_info=True
)

def _publish(self, topic: str, payload: Any, *, qos=0, retain=False):
"""Publish the message via mqtt."""
"""Publish a message via MQTT."""
logger.debug(
f'Publishing to topic "{topic}" with payload "{payload}", qos={qos}, retain={retain}'
)
self._mqtt_client.publish(
topic=f"{base_topic}/{topic}",
payload=json.dumps(payload),
Expand All @@ -128,10 +113,10 @@ def _publish(self, topic: str, payload: Any, *, qos=0, retain=False):
)

def _send_throttled(
self, topic, payload, *, min_time_skip=500, qos=0, retain=False
self, topic: str, payload: Any, *, min_time_skip=500, qos=0, retain=False
):
"""Send mqtt message throttled unless value changed."""
now = _get_current_time_milli()
"""Send an MQTT message throttled unless value has changed."""
now = get_current_time_milli()

if topic in self._attributes:
prev = self._attributes[topic]
Expand All @@ -141,65 +126,57 @@ def _send_throttled(
if prev["value"] != payload and time_since_last_update < min_time_skip:
return

logger.debug(
f'Sending throttled message for topic "{topic}" with payload "{payload}"'
)
self._attributes[topic] = {"value": payload, "last_update": now}
self._publish(topic, payload, retain=retain, qos=qos)

def _send_player_state(self, payload: Any):
"""Map the player state data."""
"""Map player state data."""
self._send_throttled(Mqtt_Attributes.STATE.value, payload["state"])
if "title" in payload:
self._send_throttled(Mqtt_Attributes.TITLE.value, payload["title"])
if "artist" in payload:
self._send_throttled(Mqtt_Attributes.ARTIST.value, payload["artist"])
if "elapsed" in payload:
self._send_throttled(
Mqtt_Attributes.ELAPSED.value,
payload["elapsed"],
min_time_skip=2000,
)
if "duration" in payload:
self._send_throttled(Mqtt_Attributes.DURATION.value, payload["duration"])
if "track" in payload:
self._send_throttled(Mqtt_Attributes.TRACK.value, payload["song"])
if "file" in payload:
self._send_throttled(Mqtt_Attributes.FILE.value, payload["file"])
for attr in ["title", "artist", "elapsed", "duration", "track", "file"]:
if attr in payload:
self._send_throttled(Mqtt_Attributes[attr.upper()].value, payload[attr])

self._send_throttled(Mqtt_Attributes.RANDOM.value, payload["random"] == "1")
self._send_throttled(Mqtt_Attributes.RANDOM.value, payload.get("random") == "1")

repeat_active = bool(payload["repeat"] == "1")
repeat_active = bool(payload.get("repeat") == "1")
self._send_throttled(Mqtt_Attributes.REPEAT.value, repeat_active)
self._send_throttled(
Mqtt_Attributes.REPEAT_MODE.value,
_map_repeat_mode(repeat_active, payload["single"] == "1"),
map_repeat_mode(repeat_active, payload.get("single") == "1"),
)

def _send_volume(self, payload: Any):
"""Map the volume data."""
"""Map volume data."""
logger.debug(f"Sending volume update with payload: {payload}")
if legacy_support_enabled:
self._send_throttled(Mqtt_Attributes.VOLUME.value, payload["volume"])
self._send_throttled(Mqtt_Attributes.MUTE.value, bool(payload["mute"]))
self._send_throttled("status/player/volume", payload["volume"])
self._send_throttled("status/player/mute", bool(payload["mute"]))
self._send_throttled(Mqtt_Attributes.VOLUME.value, payload.get("volume"))
self._send_throttled(Mqtt_Attributes.MUTE.value, bool(payload.get("mute")))
self._send_throttled("status/player/volume", payload.get("volume"))
self._send_throttled("status/player/mute", bool(payload.get("mute")))

def run(self) -> None:
"""The main loop of the MQTT thread."""
logger.info("Start MQTT Thread")
"""Main loop of the MQTT thread."""
logger.info("Starting MQTT Thread")
self._send_throttled("state", "online", qos=1, retain=True)
self._send_throttled("version", jukebox.version(), qos=1, retain=True) # type: ignore
self._subscribe()

sub = jukebox.publishing.subscriber.Subscriber(
"inproc://PublisherToProxy", topics_to_send
)
while self._keep_running:
[topic, payload] = sub.receive()
topic, payload = sub.receive()
if topic == "volume.level":
self._send_volume(payload)
elif topic == "playerstatus":
self._send_player_state(payload)
logger.info("Exit MQTT Thread")
logger.info("Exiting MQTT Thread")

def stop(self):
"""Stop the mqtt thread"""
"""Stop the MQTT thread."""
logger.info("Stopping MQTT Thread")
self._send_throttled("state", "offline", qos=1, retain=True)

Expand All @@ -213,27 +190,27 @@ def stop(self):


def on_connect(client, userdata, flags, rc):
"""Start thread on successful mqtt connection."""

"""Start thread on successful MQTT connection."""
global mqtt
logger.debug(f"Connected with result code {rc} {base_topic}")

logger.debug(f"Connected with result code {rc} to {base_topic}")
mqtt = MQTT(client)
mqtt.start()


@plugs.initialize
def initialize():
"""Setup connection and trigger the mqtt loop."""

"""Setup connection and trigger the MQTT loop."""
global mqtt_client

client_id = cfg.setndefault("mqtt", "base_topic", value="phoniebox-future3")
client_id = cfg.setndefault("mqtt", "client_id", value="phoniebox-future3")
username = cfg.setndefault("mqtt", "username", value="phoniebox-dev")
password = cfg.setndefault("mqtt", "password", value="phoniebox-dev")
host = cfg.setndefault("mqtt", "host", value="127.0.0.1")
port = cfg.setndefault("mqtt", "port", value=1883)

logger.info(
f"Initializing MQTT client with client_id={client_id}, username={username}, host={host}, port={port}"
)
mqtt_client = paho_mqtt.Client(client_id=client_id)
mqtt_client.username_pw_set(username=username, password=password)
mqtt_client.on_connect = on_connect
Expand All @@ -242,11 +219,14 @@ def initialize():
)
mqtt_client.connect(host, port, 60)
mqtt_client.loop_start()
logger.info("MQTT client initialized and loop started")


@plugs.atexit
def atexit(signal_id: int, **ignored_kwargs):
global mqtt, mqtt_client
logger.info("Executing atexit handler, stopping MQTT client")
mqtt.stop()
mqtt_client.loop_stop()
mqtt_client.disconnect()
logger.info("MQTT client stopped and disconnected")
44 changes: 36 additions & 8 deletions src/jukebox/components/mqtt/mqtt_command_alias.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,21 @@
"""

import json
from distutils.util import strtobool

import jukebox.plugs as plugs

from .mqtt_const import Mqtt_Commands
from .utils import parse_repeat_mode, play_folder_recursive_args


def get_mute(payload) -> bool:
"""Helper to toggle mute in legacy support."""
is_mute = plugs.call_ignore_errors(
package="volume", plugin="ctrl", method="get_mute"
)
return not is_mute


legacy_mqtt_cmd = {
"volumeup": {"rpc": "change_volume", "args": 1},
"volumedown": {"rpc": "change_volume", "args": -1},
Expand All @@ -20,7 +30,7 @@
"plugin": "ctrl",
"method": "mute",
},
"args": strtobool,
"args": get_mute,
},
"playerplay": {"rpc": "play"},
"playerpause": {"rpc": "pause"},
Expand Down Expand Up @@ -48,12 +58,30 @@
"method": "replay",
}
},
"setvolume": {"rpc": "set_volume", "args": int},
"setmaxvolume": {"rpc": "set_soft_max_volume", "args": int},
"shutdownafter": {"rpc": "timer_shutdown", "args": int},
"playerstopafter": {"rpc": "timer_stop_player", "args": int},
"playerrepeat": {"rpc": "repeat", "args": parse_repeat_mode},
"playfolder": {"rpc": "play_folder", "args": str},
"setvolume": {
"rpc": "set_volume",
"args": int,
},
"setmaxvolume": {
"rpc": "set_soft_max_volume",
"args": int,
},
"shutdownafter": {
"rpc": "timer_shutdown",
"args": int,
},
"playerstopafter": {
"rpc": "timer_stop_player",
"args": int,
},
"playerrepeat": {
"rpc": "repeat",
"args": parse_repeat_mode,
},
"playfolder": {
"rpc": "play_folder",
"args": str,
},
"playfolderrecursive": {
"rpc": "play_folder",
"kwargs": play_folder_recursive_args, # kwargs: folder, recursive
Expand Down
Loading

0 comments on commit c4aa79c

Please sign in to comment.