From 9b95ae592a6ee6246a211a3d30c5951dcbd456a6 Mon Sep 17 00:00:00 2001 From: Corran Webster Date: Fri, 20 Sep 2024 07:32:43 +0100 Subject: [PATCH] Add a TextDisplay that uses ANSI escapes. --- docs/source/examples/ansi_clock.py | 62 ++++++++++++++++++++++++++ src/ultimo_display/ansi_text_device.py | 34 ++++++++++++++ src/ultimo_display/text_device.py | 3 ++ 3 files changed, 99 insertions(+) create mode 100644 docs/source/examples/ansi_clock.py create mode 100644 src/ultimo_display/ansi_text_device.py diff --git a/docs/source/examples/ansi_clock.py b/docs/source/examples/ansi_clock.py new file mode 100644 index 0000000..85b272f --- /dev/null +++ b/docs/source/examples/ansi_clock.py @@ -0,0 +1,62 @@ +# SPDX-FileCopyrightText: 2024-present Unital Software +# +# SPDX-License-Identifier: MIT + +"""ANSI-compatible text device.""" + +import uasyncio + +from ultimo.pipelines import Dedup, apipe +from ultimo.value import Value +from ultimo_display.ansi_text_device import ANSITextDevice +from ultimo_display.text_device import ATextDevice +from ultimo_machine.time import PollRTC + + +@apipe +async def get_formatted(dt: tuple[int, ...], index: int): + return f"{dt[index]:02d}" + + +async def blink_colons( + clock: Value, text_device: ATextDevice, positions: list[tuple[int, int]] +): + async for value in clock: + for position in positions: + await text_device.display_at(":", position) + await uasyncio.sleep(0.8) + for position in positions: + await text_device.erase(1, position) + + +async def main(): + """Poll values from the real-time clock and print values as they change.""" + + text_device = ANSITextDevice() + await text_device.clear() + + rtc = PollRTC() + clock = Value(await rtc()) + update_clock = rtc | clock + display_hours = clock | get_formatted(4) | Dedup() | text_device.display_text(0, 0) + display_minutes = ( + clock | get_formatted(5) | Dedup() | text_device.display_text(0, 3) + ) + display_seconds = ( + clock | get_formatted(6) | Dedup() | text_device.display_text(0, 6) + ) + blink_display = blink_colons(clock, text_device, [(2, 0), (5, 0)]) + + # run forever + await uasyncio.gather( + update_clock.create_task(), + display_hours.create_task(), + display_minutes.create_task(), + display_seconds.create_task(), + uasyncio.create_task(blink_display), + ) + + +if __name__ == "__main__": + # run forever + uasyncio.run(main()) diff --git a/src/ultimo_display/ansi_text_device.py b/src/ultimo_display/ansi_text_device.py new file mode 100644 index 0000000..82fe951 --- /dev/null +++ b/src/ultimo_display/ansi_text_device.py @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: 2024-present Unital Software +# +# SPDX-License-Identifier: MIT + +"""ANSI-compatible text device.""" + +from ultimo.stream import AWrite +from ultimo_display.text_device import ATextDevice + + +class ANSITextDevice(ATextDevice): + """Text device that outputs ANSI control codes.""" + + stream: AWrite + + def __init__(self, stream=None, size=(80, 25)): + if stream is None: + stream = AWrite() + self.stream = stream + self.size = size + + async def display_at(self, text: str, position: tuple[int, int]): + column, row = position + await self.stream(f'\x1b[{row+1:d};{column+1:d}f' + text) + + async def set_cursor(self, position: tuple[int, int]): + column, row = position + await self.stream('\x1b[%d;%dH\x1b[?25h' % (row+1, column+1)) + + async def clear_cursor(self): + await self.stream('\x1b[?25l') + + async def clear(self): + await self.stream('\x1b[2J') diff --git a/src/ultimo_display/text_device.py b/src/ultimo_display/text_device.py index 25ad002..b99e5c8 100644 --- a/src/ultimo_display/text_device.py +++ b/src/ultimo_display/text_device.py @@ -6,8 +6,11 @@ from ultimo.core import Consumer + class ATextDevice: + """ABC for text-based displays.""" + #: The size of the display (width, height) size: tuple[int, int] async def display_at(self, text: str, position: tuple[int, int]):