Skip to content

Commit

Permalink
Return cursor of the first host logs entry via headers (#5333)
Browse files Browse the repository at this point in the history
* Return cursor of the first host logs entry via headers

Return first entry's cursor via custom `X-First-Cursor` header that can
be consumed by the client and used for continual requesting of the
historic logs. Once the first fetch returns data, the cursor can be
supplied as the first argument to the Range header in another call,
fetching accurate slice of the journal with the previous log entries
using the `Range: entries=cursor[[:num_skip]:num_entries]` syntax.

Let's say we fetch logs with the Range header `entries=:-19:20` (to
fetch very last 20 lines of the logs, see below why not
`entries:-20:20`) and we get `cursor50` as the reply (the actual value
will be much more complex and with no guaranteed format). To fetch
previous slice of the logs, we use `entries=cursor50:-20:20`, which
would return 20 lines previous to `cursor50` and `cursor30` in the
cursor header. This way we can go all the way back to the history.

One problem with the cursor is that it's not possible to determine when
the negative num_skip points beyond the first log entry. In that case
the client either needs to know what the first entry is (via
`entries=:0:1`) or can iterate naively and stop once two subsequent
requests return the same first cursor.

Another caveat, even though it's unlikely it will be hit in real usage,
is that it's not possible to fetch the last line only - if no cursor is
provided, negative num_skip argument is needed, and in that case we're
pointing one record back from the current cursor, which is the previous
record. The least we can return without knowing any cursor is thus
`entries=:-1:2` (where the `2` can be omitted, however with
`entries=:-1:1` we would lose the last line). This also explains why
different `num_skip` and `num_entries` must be used for the first fetch.

* Fix typo (fallback->callback)

* Refactor journal_logs_reader to always return the cursor

* Update tests for new cursor handling
  • Loading branch information
sairon authored Oct 9, 2024
1 parent e0d7985 commit 9f3767b
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 20 deletions.
9 changes: 7 additions & 2 deletions supervisor/api/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,13 @@ async def advanced_logs_handler(
try:
response = web.StreamResponse()
response.content_type = CONTENT_TYPE_TEXT
await response.prepare(request)
async for line in journal_logs_reader(resp, log_formatter):
headers_returned = False
async for cursor, line in journal_logs_reader(resp, log_formatter):
if not headers_returned:
if cursor:
response.headers["X-First-Cursor"] = cursor
await response.prepare(request)
headers_returned = True
await response.write(line.encode("utf-8") + b"\n")
except ConnectionResetError as ex:
raise APIError(
Expand Down
14 changes: 8 additions & 6 deletions supervisor/utils/systemd_journal.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def decorator(func):
def wrapper(*args, **kwargs):
return func(*args, **kwargs)

wrapper.required_fields = required_fields
wrapper.required_fields = ["__CURSOR"] + required_fields
return wrapper

return decorator
Expand Down Expand Up @@ -60,10 +60,12 @@ def journal_verbose_formatter(entries: dict[str, str]) -> str:


async def journal_logs_reader(
journal_logs: ClientResponse,
log_formatter: LogFormatter = LogFormatter.PLAIN,
) -> AsyncGenerator[str, None]:
"""Read logs from systemd journal line by line, formatted using the given formatter."""
journal_logs: ClientResponse, log_formatter: LogFormatter = LogFormatter.PLAIN
) -> AsyncGenerator[(str | None, str), None]:
"""Read logs from systemd journal line by line, formatted using the given formatter.
Returns a generator of (cursor, formatted_entry) tuples.
"""
match log_formatter:
case LogFormatter.PLAIN:
formatter_ = journal_plain_formatter
Expand All @@ -80,7 +82,7 @@ async def journal_logs_reader(
# at EOF (likely race between at_eof and EOF check in readuntil)
if line == b"\n" or not line:
if entries:
yield formatter_(entries)
yield entries.get("__CURSOR"), formatter_(entries)
entries = {}
continue

Expand Down
12 changes: 10 additions & 2 deletions tests/host/test_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ async def test_logs(coresys: CoreSys, journald_gateway: MagicMock):
journald_gateway.feed_eof()

async with coresys.host.logs.journald_logs() as resp:
line = await anext(
cursor, line = await anext(
journal_logs_reader(resp, log_formatter=LogFormatter.VERBOSE)
)
assert (
cursor
== "s=83fee99ca0c3466db5fc120d52ca7dd8;i=203f2ce;b=f5a5c442fa6548cf97474d2d57c920b3;m=3191a3c620;t=612ccd299e7af;x=8675b540119d10bb"
)
assert (
line
== "2024-03-04 02:52:56.193 homeassistant systemd[1]: Started Hostname Service."
Expand All @@ -64,7 +68,11 @@ async def test_logs_coloured(coresys: CoreSys, journald_gateway: MagicMock):
journald_gateway.feed_eof()

async with coresys.host.logs.journald_logs() as resp:
line = await anext(journal_logs_reader(resp))
cursor, line = await anext(journal_logs_reader(resp))
assert (
cursor
== "s=83fee99ca0c3466db5fc120d52ca7dd8;i=2049389;b=f5a5c442fa6548cf97474d2d57c920b3;m=4263828e8c;t=612dda478b01b;x=9ae12394c9326930"
)
assert (
line
== "\x1b[32m24-03-04 23:56:56 INFO (MainThread) [__main__] Closing Supervisor\x1b[0m"
Expand Down
43 changes: 33 additions & 10 deletions tests/utils/test_systemd_journal.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Test systemd journal utilities."""

import asyncio
from unittest.mock import MagicMock
from unittest.mock import ANY, MagicMock

import pytest

Expand Down Expand Up @@ -89,7 +89,7 @@ async def test_parsing_simple():
"""Test plain formatter."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(b"MESSAGE=Hello, world!\n\n")
line = await anext(journal_logs_reader(journal_logs))
_, line = await anext(journal_logs_reader(journal_logs))
assert line == "Hello, world!"


Expand All @@ -103,7 +103,7 @@ async def test_parsing_verbose():
b"_PID=666\n"
b"MESSAGE=Hello, world!\n\n"
)
line = await anext(
_, line = await anext(
journal_logs_reader(journal_logs, log_formatter=LogFormatter.VERBOSE)
)
assert line == "2013-09-17 07:32:51.000 homeassistant python[666]: Hello, world!"
Expand All @@ -118,7 +118,7 @@ async def test_parsing_newlines_in_message():
b"AFTER=after\n\n"
)

line = await anext(journal_logs_reader(journal_logs))
_, line = await anext(journal_logs_reader(journal_logs))
assert line == "Hello,\nworld!"


Expand All @@ -135,8 +135,8 @@ async def test_parsing_newlines_in_multiple_fields():
b"AFTER=after\n\n"
)

assert await anext(journal_logs_reader(journal_logs)) == "Hello,\nworld!\n"
assert await anext(journal_logs_reader(journal_logs)) == "Hello,\nworld!"
assert await anext(journal_logs_reader(journal_logs)) == (ANY, "Hello,\nworld!\n")
assert await anext(journal_logs_reader(journal_logs)) == (ANY, "Hello,\nworld!")


async def test_parsing_two_messages():
Expand All @@ -151,8 +151,31 @@ async def test_parsing_two_messages():
stream.feed_eof()

reader = journal_logs_reader(journal_logs)
assert await anext(reader) == "Hello, world!"
assert await anext(reader) == "Hello again, world!"
assert await anext(reader) == (ANY, "Hello, world!")
assert await anext(reader) == (ANY, "Hello again, world!")
with pytest.raises(StopAsyncIteration):
await anext(reader)


async def test_cursor_parsing():
"""Test cursor is extracted correctly."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(
b"__CURSOR=cursor1\n"
b"MESSAGE=Hello, world!\n"
b"ID=1\n\n"
b"__CURSOR=cursor2\n"
b"MESSAGE=Hello again, world!\n"
b"ID=2\n\n"
b"MESSAGE=No cursor\n"
b"ID=2\n\n"
)
stream.feed_eof()

reader = journal_logs_reader(journal_logs)
assert await anext(reader) == ("cursor1", "Hello, world!")
assert await anext(reader) == ("cursor2", "Hello again, world!")
assert await anext(reader) == (None, "No cursor")
with pytest.raises(StopAsyncIteration):
await anext(reader)

Expand All @@ -174,15 +197,15 @@ async def test_parsing_journal_host_logs():
"""Test parsing of real host logs."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(load_fixture("logs_export_host.txt").encode("utf-8"))
line = await anext(journal_logs_reader(journal_logs))
_, line = await anext(journal_logs_reader(journal_logs))
assert line == "Started Hostname Service."


async def test_parsing_colored_supervisor_logs():
"""Test parsing of real logs with ANSI escape sequences."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(load_fixture("logs_export_supervisor.txt").encode("utf-8"))
line = await anext(journal_logs_reader(journal_logs))
_, line = await anext(journal_logs_reader(journal_logs))
assert (
line
== "\x1b[32m24-03-04 23:56:56 INFO (MainThread) [__main__] Closing Supervisor\x1b[0m"
Expand Down

0 comments on commit 9f3767b

Please sign in to comment.