Skip to content

Commit

Permalink
Add a TextDisplay that uses ANSI escapes.
Browse files Browse the repository at this point in the history
  • Loading branch information
corranwebster committed Sep 20, 2024
1 parent af8702f commit 9b95ae5
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 0 deletions.
62 changes: 62 additions & 0 deletions docs/source/examples/ansi_clock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# SPDX-FileCopyrightText: 2024-present Unital Software <[email protected]>
#
# SPDX-License-Identifier: MIT

"""ANSI-compatible text device."""

import uasyncio

from ultimo.pipelines import Dedup, apipe
from ultimo.value import Value
from ultimo_display.ansi_text_device import ANSITextDevice
from ultimo_display.text_device import ATextDevice
from ultimo_machine.time import PollRTC


@apipe
async def get_formatted(dt: tuple[int, ...], index: int):
return f"{dt[index]:02d}"


async def blink_colons(
clock: Value, text_device: ATextDevice, positions: list[tuple[int, int]]
):
async for value in clock:
for position in positions:
await text_device.display_at(":", position)
await uasyncio.sleep(0.8)
for position in positions:
await text_device.erase(1, position)


async def main():
"""Poll values from the real-time clock and print values as they change."""

text_device = ANSITextDevice()
await text_device.clear()

rtc = PollRTC()
clock = Value(await rtc())
update_clock = rtc | clock
display_hours = clock | get_formatted(4) | Dedup() | text_device.display_text(0, 0)
display_minutes = (
clock | get_formatted(5) | Dedup() | text_device.display_text(0, 3)
)
display_seconds = (
clock | get_formatted(6) | Dedup() | text_device.display_text(0, 6)
)
blink_display = blink_colons(clock, text_device, [(2, 0), (5, 0)])

# run forever
await uasyncio.gather(
update_clock.create_task(),
display_hours.create_task(),
display_minutes.create_task(),
display_seconds.create_task(),
uasyncio.create_task(blink_display),
)


if __name__ == "__main__":
# run forever
uasyncio.run(main())
34 changes: 34 additions & 0 deletions src/ultimo_display/ansi_text_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# SPDX-FileCopyrightText: 2024-present Unital Software <[email protected]>
#
# SPDX-License-Identifier: MIT

"""ANSI-compatible text device."""

from ultimo.stream import AWrite
from ultimo_display.text_device import ATextDevice


class ANSITextDevice(ATextDevice):
"""Text device that outputs ANSI control codes."""

stream: AWrite

def __init__(self, stream=None, size=(80, 25)):
if stream is None:
stream = AWrite()
self.stream = stream
self.size = size

async def display_at(self, text: str, position: tuple[int, int]):
column, row = position
await self.stream(f'\x1b[{row+1:d};{column+1:d}f' + text)

async def set_cursor(self, position: tuple[int, int]):
column, row = position
await self.stream('\x1b[%d;%dH\x1b[?25h' % (row+1, column+1))

async def clear_cursor(self):
await self.stream('\x1b[?25l')

async def clear(self):
await self.stream('\x1b[2J')
3 changes: 3 additions & 0 deletions src/ultimo_display/text_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@

from ultimo.core import Consumer


class ATextDevice:
"""ABC for text-based displays."""

#: The size of the display (width, height)
size: tuple[int, int]

async def display_at(self, text: str, position: tuple[int, int]):
Expand Down

0 comments on commit 9b95ae5

Please sign in to comment.