From 73e1ce31efdda56618b7ab79115bd5d3cf226280 Mon Sep 17 00:00:00 2001 From: Pierre Le Marre Date: Fri, 6 Sep 2024 09:13:53 +0200 Subject: [PATCH] Further refactoring --- test/xkeyboard-config-test.py.in | 407 ++++++++++++++++--------------- 1 file changed, 217 insertions(+), 190 deletions(-) diff --git a/test/xkeyboard-config-test.py.in b/test/xkeyboard-config-test.py.in index 41b2b12d..2bfbfa2c 100755 --- a/test/xkeyboard-config-test.py.in +++ b/test/xkeyboard-config-test.py.in @@ -1,14 +1,17 @@ #!/usr/bin/env python3 import argparse -from dataclasses import dataclass +import dataclasses +import itertools import multiprocessing -import sys -import subprocess import os -from typing import Any, Generator +import subprocess +import sys import xml.etree.ElementTree as ET +from abc import ABCMeta, abstractmethod +from dataclasses import dataclass from pathlib import Path +from typing import Any, ClassVar, Generator, Iterable, TextIO, cast if sys.version_info >= (3, 11): from typing import Self @@ -21,117 +24,152 @@ DEFAULT_RULES_XML = "@XKB_CONFIG_ROOT@/rules/evdev.xml" # Meson needs to fill this in so we can call the tool in the buildir. EXTRA_PATH = "@MESON_BUILD_ROOT@" -os.environ["PATH"] = ":".join([EXTRA_PATH, os.getenv("PATH")]) - - -def escape(s): - return s.replace('"', '\\"') - - -# The function generating the progress bar (if any). -def create_progress_bar(verbose): - def noop_progress_bar(x, total, file=None): - return x - - progress_bar = noop_progress_bar - if not verbose and os.isatty(sys.stdout.fileno()): - try: - from tqdm import tqdm +os.environ["PATH"] = ":".join(filter(None, (EXTRA_PATH, os.getenv("PATH")))) - progress_bar = tqdm - except ImportError: - pass - return progress_bar - - -class Invocation: - def __init__(self, r, m, l, v, o): - self.command = "" - self.rules = r - self.model = m - self.layout = l - self.variant = v - self.option = o - self.exitstatus = 77 # default to skipped - self.error = None - self.keymap = None # The fully compiled keymap +@dataclass +class RMLVO: + rules: str + model: str + layout: str + variant: str | None + option: str | None @property - def rmlvo(self): - return self.rules, self.model, self.layout, self.variant, self.option - - @staticmethod - def rmlvo_labels(): - return "rules", "model", "layout", "variant", "option" + def __iter__(self) -> Generator[str | None, None, None]: + yield self.rules + yield self.model + yield self.layout + yield self.variant + yield self.option @property - def rmlvo_yaml(self) -> str: - return ( - "{ " - + ", ".join( - f'{key}: "{value}"' - for key, value in zip(self.rmlvo_labels(), self.rmlvo) - if value - ) - + " }" - ) + def rmlvo(self) -> dict[str, str]: + return { + k: v + for k, v in dataclasses.asdict(self).items() + # Keep only defined and non-empty values + if v is not None + # Keep only RMLVO fields (useful for subclasses) + and k in RMLVO.__dataclass_fields__ + } - @staticmethod - def parse_rmlvo(rmlvo: dict[str, str | None]): - return ( + @classmethod + def from_rmlvo( + cls, + rmlvo: dict[str, str | None], + ) -> Self: + return cls( + # We need to force a value for RML components rmlvo.get("r") or "evdev", rmlvo.get("m") or "pc105", rmlvo.get("l") or "us", - rmlvo.get("v", None), - rmlvo.get("o", None), + rmlvo.get("v"), + rmlvo.get("o"), ) + +@dataclass +class Invocation(RMLVO, metaclass=ABCMeta): + exitstatus: int = 77 # default to “skipped” + error: str | None = None + keymap: str = "" + command: str = "" # The fully compiled keymap + def __str_iter(self) -> Generator[str, None, None]: - rmlvo = ", ".join(f'"{x or ""}"' for x in self.rmlvo) - yield f"- rmlvo: [{rmlvo}]" - yield f' cmd: "{escape(self.command)}"' + yield f"- rmlvo: {self.rmlvo}" + yield f' cmd: "{self.escape(self.command)}"' yield f" status: {self.exitstatus}" if self.error: - yield f' error: "{escape(self.error.strip())}"' + yield f' error: "{self.escape(self.error.strip())}"' - def __str__(self): + def __str__(self) -> str: return "\n".join(self.__str_iter()) - def _run(self): - raise NotImplementedError + @staticmethod + def escape(s): + return s.replace('"', '\\"') + + @abstractmethod + def _run(self) -> Self: ... @classmethod - def run(cls, rmlvo) -> Self: - r, m, l, v, o = cls.parse_rmlvo(rmlvo) - tool = cls(r, m, l, v, o) - tool._run() - return tool + def run(cls, rmlvo: dict[str, str | None]) -> Self: + return cls.from_rmlvo(rmlvo)._run() + @classmethod + def run_all( + cls, + combos: Iterable[dict[str, str | None]], + combos_count: int, + njobs: int, + keymap_output_dir: Path | None, + verbose: bool, + short: bool, + progress_bar, + ) -> bool: + if keymap_output_dir: + try: + keymap_output_dir.mkdir() + except FileExistsError as e: + print(e, file=sys.stderr) + return False + + keymap_file: Path | None = None + keymap_file_fd: TextIO | None = None + + failed = False + with multiprocessing.Pool(njobs) as p: + results = p.imap_unordered(cls.run, combos) + invocation: Invocation + for invocation in progress_bar( + results, total=combos_count, file=sys.stdout + ): + if invocation.exitstatus != 0: + failed = True + target = sys.stderr + else: + target = sys.stdout if verbose else None + + if target: + if short: + print("-", invocation.rmlvo, file=target) + else: + print(invocation, file=target) + + if keymap_output_dir: + # we're running through the layouts in a somewhat sorted manner, + # so let's keep the fd open until we switch layouts + layout = invocation.layout + if invocation.variant: + layout += f"({invocation.variant})" + fname = keymap_output_dir / layout + if fname != keymap_file: + keymap_file = fname + if keymap_file_fd: + keymap_file_fd.close() + keymap_file_fd = open(keymap_file, "a") + + print(f"// {invocation.rmlvo}", file=keymap_file_fd) + print(invocation.keymap, file=keymap_file_fd) + assert keymap_file_fd + keymap_file_fd.flush() + + return failed + +@dataclass class XkbCompInvocation(Invocation): - def _run(self): - r, m, l, v, o = self.rmlvo - args = ["setxkbmap", "-print"] - if r is not None: - args.append("-rules") - args.append("{}".format(r)) - if m is not None: - args.append("-model") - args.append("{}".format(m)) - if l is not None: - args.append("-layout") - args.append("{}".format(l)) - if v is not None: - args.append("-variant") - args.append("{}".format(v)) - if o is not None: - args.append("-option") - args.append("{}".format(o)) - - xkbcomp_args = ["xkbcomp", "-xkb", "-", "-"] - - self.command = " ".join(args + ["|"] + xkbcomp_args) + xkbcomp_args: ClassVar[tuple[str, ...]] = ("xkbcomp", "-xkb", "-", "-") + + def _run(self) -> Self: + args = ( + "setxkbmap", + "-print", + *itertools.chain.from_iterable((f"-{k}", v) for k, v in self.rmlvo.items()), + ) + + self.command = " ".join(itertools.chain(args, "|", self.xkbcomp_args)) setxkbmap = subprocess.Popen( args, @@ -145,7 +183,7 @@ class XkbCompInvocation(Invocation): self.exitstatus = 90 else: xkbcomp = subprocess.Popen( - xkbcomp_args, + self.xkbcomp_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -158,28 +196,21 @@ class XkbCompInvocation(Invocation): else: self.keymap = stdout self.exitstatus = 0 + return self +@dataclass class XkbcommonInvocation(Invocation): - UNRECOGNIZED_KEYSYM_ERROR = "XKB-107" + UNRECOGNIZED_KEYSYM_ERROR: ClassVar[str] = "XKB-107" - def _run(self): - r, m, l, v, o = self.rmlvo - args = [ + def _run(self) -> Self: + args = ( "xkbcli-compile-keymap", # this is run in the builddir "--verbose", - "--rules", - r, - "--model", - m, - "--layout", - l, - ] - if v is not None: - args += ["--variant", v] - if o is not None: - args += ["--options", o] - + *itertools.chain.from_iterable( + (f"--{k}", v) for k, v in self.rmlvo.items() + ), + ) self.command = " ".join(args) try: output = subprocess.check_output( @@ -196,6 +227,7 @@ class XkbcommonInvocation(Invocation): except subprocess.CalledProcessError as err: self.error = "failed to compile keymap" self.exitstatus = err.returncode + return self @dataclass @@ -203,8 +235,33 @@ class Layout: name: str variants: list[str | None] + @classmethod + def parse(cls, e: ET.Element) -> Self: + if (name_elem := e.find("configItem/name")) is None or name_elem is None: + raise ValueError("Layout name not found") + return cls( + cls.parse_text(e.find("configItem/name")), + [None] + + [ + cls.parse_text(v) + for v in e.findall("variantList/variant/configItem/name") + ], + ) -def parse(path: Path, model, layout, variant, option): + @staticmethod + def parse_text(e: ET.Element | None) -> str: + if e is None or not e.text: + raise ValueError("Name not found") + return e.text + + +def parse_registry( + path: Path, + model: str | None, + layout: str | None, + variant: str | None, + option: str | None, +) -> tuple[int, Generator[dict[str, str | None], None, None]]: root = ET.fromstring(open(path).read()) models = ( @@ -212,95 +269,57 @@ def parse(path: Path, model, layout, variant, option): if model is None else [model] ) + if variant and not layout: raise ValueError("Variant must be set together with layout") elif layout: - layouts = (Layout(layout, variant.split(",") if variant else [None]),) - else: - layouts = tuple( + layouts: tuple[Layout, ...] = ( Layout( - e.find("configItem/name").text, - [None] - + [v.text for v in e.findall("variantList/variant/configItem/name")], - ) - for e in root.findall("layoutList/layout") + layout, + cast(list[str | None], variant.split(",")) if variant else [None], + ), ) + else: + layouts = tuple(map(Layout.parse, root.findall("layoutList/layout"))) + options = ( tuple(e.text for e in root.findall("optionList/group/option/configItem/name")) if option is None - else ([option] if option else []) + else ((option,) if option else ()) ) - combos = [] + count = len(models) * sum(len(l.variants) for l in layouts) * (1 + len(options)) - for m in models: - for l in layouts: - for v in l.variants: - combos.append({"m": m, "l": l.name, "v": v}) - for opt in options: - combos.append({"m": m, "l": l.name, "v": v, "o": opt}) + # The list of combos can be huge, so better to use a generator instead + def iter_combos() -> Generator[dict[str, str | None], None, None]: + for m in models: + for l in layouts: + for v in l.variants: + yield {"m": m, "l": l.name, "v": v} + for opt in options: + yield {"m": m, "l": l.name, "v": v, "o": opt} - return combos + return count, iter_combos() -def run( - combos, tool, njobs, keymap_output_dir, verbose: bool, short: bool, progress_bar -): - if keymap_output_dir: - keymap_output_dir = Path(keymap_output_dir) - try: - keymap_output_dir.mkdir() - except FileExistsError as e: - print(e, file=sys.stderr) - return False - - keymap_file = None - keymap_file_fd = None - - failed = False - with multiprocessing.Pool(njobs) as p: - results = p.imap_unordered(tool.run, combos) - invocation: Invocation - for invocation in progress_bar(results, total=len(combos), file=sys.stdout): - if invocation.exitstatus != 0: - failed = True - target = sys.stderr - else: - target = sys.stdout if verbose else None - - if target: - if short: - print("-", invocation.rmlvo_yaml, file=target) - else: - print(invocation, file=target) +# The function generating the progress bar (if any). +def create_progress_bar(verbose): + def noop_progress_bar(x: Iterable[Any], total: int, file=None): + return x - if keymap_output_dir: - # we're running through the layouts in a somewhat sorted manner, - # so let's keep the fd open until we switch layouts - layout = invocation.layout - if invocation.variant: - layout += f"({invocation.variant})" - fname = keymap_output_dir / layout - if fname != keymap_file: - keymap_file = fname - if keymap_file_fd: - keymap_file_fd.close() - keymap_file_fd = open(keymap_file, "a") + progress_bar: Any = noop_progress_bar + if not verbose and os.isatty(sys.stdout.fileno()): + try: + from tqdm import tqdm - rmlvo = ", ".join([x or "" for x in invocation.rmlvo]) - print(f"// {rmlvo}", file=keymap_file_fd) - print(invocation.keymap, file=keymap_file_fd) - keymap_file_fd.flush() + progress_bar = tqdm + except ImportError: + pass - return failed + return progress_bar def main(args): - tools = { - "libxkbcommon": XkbcommonInvocation, - "xkbcomp": XkbCompInvocation, - } - parser = argparse.ArgumentParser( description=""" This tool compiles a keymap for each layout, variant and @@ -317,6 +336,10 @@ def main(args): default=DEFAULT_RULES_XML, help="Path to xkeyboard-config's evdev.xml", ) + tools: dict[str, type[Invocation]] = { + "libxkbcommon": XkbcommonInvocation, + "xkbcomp": XkbCompInvocation, + } parser.add_argument( "--tool", choices=tools.keys(), @@ -328,7 +351,7 @@ def main(args): "--jobs", "-j", type=int, - default=os.cpu_count() * 4, + default=4 * (os.cpu_count() or 1), help="number of processes to use", ) parser.add_argument("--verbose", "-v", default=False, action="store_true") @@ -338,7 +361,7 @@ def main(args): parser.add_argument( "--keymap-output-dir", default=None, - type=str, + type=Path, help="Directory to print compiled keymaps to", ) parser.add_argument( @@ -366,24 +389,28 @@ def main(args): tool = tools[args.tool] - model = None if args.model == WILDCARD else args.model - layout = None if args.layout == WILDCARD else args.layout - variant = None if args.variant == WILDCARD else args.variant - option = None if args.option == WILDCARD else args.option + model: str | None = None if args.model == WILDCARD else args.model + layout: str | None = None if args.layout == WILDCARD else args.layout + variant: str | None = None if args.variant == WILDCARD else args.variant + option: str | None = None if args.option == WILDCARD else args.option if args.no_iterations: - combos = [ + combos = ( { "m": model, "l": layout, "v": variant, "o": option, - } - ] + }, + ) + count = len(combos) + iter_combos = iter(combos) else: - combos = parse(args.path, model, layout, variant, option) + count, iter_combos = parse_registry(args.path, model, layout, variant, option) - failed = run(combos, tool, args.jobs, keymapdir, verbose, short, progress_bar) + failed = tool.run_all( + iter_combos, count, args.jobs, keymapdir, verbose, short, progress_bar + ) sys.exit(failed)