diff --git a/supervisor/api/host.py b/supervisor/api/host.py index f6f99aac04f..6be405e2877 100644 --- a/supervisor/api/host.py +++ b/supervisor/api/host.py @@ -251,8 +251,13 @@ async def advanced_logs_handler( try: response = web.StreamResponse() response.content_type = CONTENT_TYPE_TEXT - await response.prepare(request) - async for line in journal_logs_reader(resp, log_formatter): + headers_returned = False + async for cursor, line in journal_logs_reader(resp, log_formatter): + if not headers_returned: + if cursor: + response.headers["X-First-Cursor"] = cursor + await response.prepare(request) + headers_returned = True await response.write(line.encode("utf-8") + b"\n") except ConnectionResetError as ex: raise APIError( diff --git a/supervisor/utils/systemd_journal.py b/supervisor/utils/systemd_journal.py index 5a5184f5ca2..5898d811552 100644 --- a/supervisor/utils/systemd_journal.py +++ b/supervisor/utils/systemd_journal.py @@ -22,7 +22,7 @@ def decorator(func): def wrapper(*args, **kwargs): return func(*args, **kwargs) - wrapper.required_fields = required_fields + wrapper.required_fields = ["__CURSOR"] + required_fields return wrapper return decorator @@ -60,10 +60,12 @@ def journal_verbose_formatter(entries: dict[str, str]) -> str: async def journal_logs_reader( - journal_logs: ClientResponse, - log_formatter: LogFormatter = LogFormatter.PLAIN, -) -> AsyncGenerator[str, None]: - """Read logs from systemd journal line by line, formatted using the given formatter.""" + journal_logs: ClientResponse, log_formatter: LogFormatter = LogFormatter.PLAIN +) -> AsyncGenerator[(str | None, str), None]: + """Read logs from systemd journal line by line, formatted using the given formatter. + + Returns a generator of (cursor, formatted_entry) tuples. + """ match log_formatter: case LogFormatter.PLAIN: formatter_ = journal_plain_formatter @@ -80,7 +82,7 @@ async def journal_logs_reader( # at EOF (likely race between at_eof and EOF check in readuntil) if line == b"\n" or not line: if entries: - yield formatter_(entries) + yield entries.get("__CURSOR"), formatter_(entries) entries = {} continue diff --git a/tests/host/test_logs.py b/tests/host/test_logs.py index 2017cc96bfc..86c7bf49b4d 100644 --- a/tests/host/test_logs.py +++ b/tests/host/test_logs.py @@ -40,9 +40,13 @@ async def test_logs(coresys: CoreSys, journald_gateway: MagicMock): journald_gateway.feed_eof() async with coresys.host.logs.journald_logs() as resp: - line = await anext( + cursor, line = await anext( journal_logs_reader(resp, log_formatter=LogFormatter.VERBOSE) ) + assert ( + cursor + == "s=83fee99ca0c3466db5fc120d52ca7dd8;i=203f2ce;b=f5a5c442fa6548cf97474d2d57c920b3;m=3191a3c620;t=612ccd299e7af;x=8675b540119d10bb" + ) assert ( line == "2024-03-04 02:52:56.193 homeassistant systemd[1]: Started Hostname Service." @@ -64,7 +68,11 @@ async def test_logs_coloured(coresys: CoreSys, journald_gateway: MagicMock): journald_gateway.feed_eof() async with coresys.host.logs.journald_logs() as resp: - line = await anext(journal_logs_reader(resp)) + cursor, line = await anext(journal_logs_reader(resp)) + assert ( + cursor + == "s=83fee99ca0c3466db5fc120d52ca7dd8;i=2049389;b=f5a5c442fa6548cf97474d2d57c920b3;m=4263828e8c;t=612dda478b01b;x=9ae12394c9326930" + ) assert ( line == "\x1b[32m24-03-04 23:56:56 INFO (MainThread) [__main__] Closing Supervisor\x1b[0m" diff --git a/tests/utils/test_systemd_journal.py b/tests/utils/test_systemd_journal.py index 0ba172b0632..26b21e97cc1 100644 --- a/tests/utils/test_systemd_journal.py +++ b/tests/utils/test_systemd_journal.py @@ -1,7 +1,7 @@ """Test systemd journal utilities.""" import asyncio -from unittest.mock import MagicMock +from unittest.mock import ANY, MagicMock import pytest @@ -89,7 +89,7 @@ async def test_parsing_simple(): """Test plain formatter.""" journal_logs, stream = _journal_logs_mock() stream.feed_data(b"MESSAGE=Hello, world!\n\n") - line = await anext(journal_logs_reader(journal_logs)) + _, line = await anext(journal_logs_reader(journal_logs)) assert line == "Hello, world!" @@ -103,7 +103,7 @@ async def test_parsing_verbose(): b"_PID=666\n" b"MESSAGE=Hello, world!\n\n" ) - line = await anext( + _, line = await anext( journal_logs_reader(journal_logs, log_formatter=LogFormatter.VERBOSE) ) assert line == "2013-09-17 07:32:51.000 homeassistant python[666]: Hello, world!" @@ -118,7 +118,7 @@ async def test_parsing_newlines_in_message(): b"AFTER=after\n\n" ) - line = await anext(journal_logs_reader(journal_logs)) + _, line = await anext(journal_logs_reader(journal_logs)) assert line == "Hello,\nworld!" @@ -135,8 +135,8 @@ async def test_parsing_newlines_in_multiple_fields(): b"AFTER=after\n\n" ) - assert await anext(journal_logs_reader(journal_logs)) == "Hello,\nworld!\n" - assert await anext(journal_logs_reader(journal_logs)) == "Hello,\nworld!" + assert await anext(journal_logs_reader(journal_logs)) == (ANY, "Hello,\nworld!\n") + assert await anext(journal_logs_reader(journal_logs)) == (ANY, "Hello,\nworld!") async def test_parsing_two_messages(): @@ -151,8 +151,31 @@ async def test_parsing_two_messages(): stream.feed_eof() reader = journal_logs_reader(journal_logs) - assert await anext(reader) == "Hello, world!" - assert await anext(reader) == "Hello again, world!" + assert await anext(reader) == (ANY, "Hello, world!") + assert await anext(reader) == (ANY, "Hello again, world!") + with pytest.raises(StopAsyncIteration): + await anext(reader) + + +async def test_cursor_parsing(): + """Test cursor is extracted correctly.""" + journal_logs, stream = _journal_logs_mock() + stream.feed_data( + b"__CURSOR=cursor1\n" + b"MESSAGE=Hello, world!\n" + b"ID=1\n\n" + b"__CURSOR=cursor2\n" + b"MESSAGE=Hello again, world!\n" + b"ID=2\n\n" + b"MESSAGE=No cursor\n" + b"ID=2\n\n" + ) + stream.feed_eof() + + reader = journal_logs_reader(journal_logs) + assert await anext(reader) == ("cursor1", "Hello, world!") + assert await anext(reader) == ("cursor2", "Hello again, world!") + assert await anext(reader) == (None, "No cursor") with pytest.raises(StopAsyncIteration): await anext(reader) @@ -174,7 +197,7 @@ async def test_parsing_journal_host_logs(): """Test parsing of real host logs.""" journal_logs, stream = _journal_logs_mock() stream.feed_data(load_fixture("logs_export_host.txt").encode("utf-8")) - line = await anext(journal_logs_reader(journal_logs)) + _, line = await anext(journal_logs_reader(journal_logs)) assert line == "Started Hostname Service." @@ -182,7 +205,7 @@ async def test_parsing_colored_supervisor_logs(): """Test parsing of real logs with ANSI escape sequences.""" journal_logs, stream = _journal_logs_mock() stream.feed_data(load_fixture("logs_export_supervisor.txt").encode("utf-8")) - line = await anext(journal_logs_reader(journal_logs)) + _, line = await anext(journal_logs_reader(journal_logs)) assert ( line == "\x1b[32m24-03-04 23:56:56 INFO (MainThread) [__main__] Closing Supervisor\x1b[0m"