diff --git a/ocaml/xcp-rrdd/scripts/rrdd/test_api_wait_until_next_reading.py b/ocaml/xcp-rrdd/scripts/rrdd/test_api_wait_until_next_reading.py index 5ca9b897fad..196c44804c1 100644 --- a/ocaml/xcp-rrdd/scripts/rrdd/test_api_wait_until_next_reading.py +++ b/ocaml/xcp-rrdd/scripts/rrdd/test_api_wait_until_next_reading.py @@ -1,7 +1,11 @@ # Test: pytest -v -s ocaml/xcp-rrdd/scripts/rrdd/test_api_wait_until_next_reading.py """Parametrized test exercising all conditions in rrdd.API.wait_until_next_reading()""" +import json import socket +from io import BytesIO +from struct import pack, unpack from warnings import catch_warnings as import_without_warnings, simplefilter +from zlib import crc32 # Dependencies: # pip install pytest-mock @@ -77,3 +81,109 @@ def test_api_getter_functions(api): api.path = "path" assert api.get_header() == "header" assert api.get_path() == "path" + + +class MockDataSource: + def __init__(self, name, metadata, packed_data): + self.name = name + self.metadata = metadata + self.packed_data = packed_data + + def pack_data(self): + return self.packed_data + + +@pytest.mark.parametrize( + "data_sources, expected_metadata", + [ + pytest.param( + [ + MockDataSource("ds1", {"key1": "value1"}, b"\x00\x01"), + MockDataSource("ds2", {"key2": "value2"}, b"\x00\x02"), + ], + {"key1": "value1", "key2": "value2"}, + ), + pytest.param( + [MockDataSource("ds1", {"key1": "value1"}, b"\x00\x01")], + {"key1": "value1"}, + ), + pytest.param( + [], + {}, + ), + ], +) +def test_update( + mocker, + data_sources, + expected_metadata, +): + # Arrange + def checksum(*args): + return crc32(*args) & 0xFFFFFFFF + + class MockAPI(rrdd.API): + def __init__(self): # pylint: disable=super-init-not-called + self.dest = BytesIO() + self.datasources = data_sources + + def pack_data(self, ds: MockDataSource): + return ds.pack_data() + + testee = MockAPI() + testee.deregister = mocker.Mock() + fixed_time = 1234567890 + mocker.patch("time.time", return_value=fixed_time) + + # Act + testee.update() + + # Assert + + # Read and unpack the header + testee.dest.seek(0) + # The header is 20 bytes long and has the following format: + # 0-11: "DATASOURCES" (12 bytes) + # 12-15: data_checksum (4 bytes) + # 16-19: metadata_checksum (4 bytes) + # 20-23: num_datasources (4 bytes) + # 24-31: timestamp (8 bytes) + header_len = len("DATASOURCES") + 4 + 4 + 4 + 8 + header = testee.dest.read(header_len) + ( + unpacked_data_checksum, + unpacked_metadata_checksum, + unpacked_num_datasources, + unpacked_timestamp, + ) = unpack(">LLLQ", header[11:]) + + # Assert the expected unpacked header value + assert header.startswith(b"DATASOURCES") + assert unpacked_num_datasources == len(data_sources) + assert unpacked_timestamp == fixed_time + + # + # Assert datasources and the expected data checksum + # + + # Initialize the expected checksum with the fixed time + expected_checksum = checksum(pack(">Q", fixed_time)) + # Loop over the datasources and assert the packed data + testee.dest.seek(header_len) + # sourcery skip: no-loop-in-tests + for ds in data_sources: + packed_data = testee.dest.read(len(ds.pack_data())) + assert packed_data == ds.pack_data() + # Update the checksum with the packed data + expected_checksum = checksum(packed_data, expected_checksum) + + assert unpacked_data_checksum == expected_checksum + + # + # Assert metadata and the expected metadata checksum + # + metadata_length = unpack(">L", testee.dest.read(4))[0] + metadata_json = testee.dest.read(metadata_length) + + assert json.loads(metadata_json) == {"datasources": expected_metadata} + assert unpacked_metadata_checksum == checksum(metadata_json) diff --git a/python3/stubs/xcp/cmd.pyi b/python3/stubs/xcp/cmd.pyi new file mode 100644 index 00000000000..950a6d28200 --- /dev/null +++ b/python3/stubs/xcp/cmd.pyi @@ -0,0 +1,13 @@ +from basedtyping import Untyped +from typing import Any +from xcp import logger as logger +from xcp.compat import open_defaults_for_utf8_text as open_defaults_for_utf8_text + +def runCmd(command: bytes | str | list[str], with_stdout: bool = False, with_stderr: bool = False, inputtext: bytes | str | None = None, **kwargs: Any) -> Any: ... + +class OutputCache: + cache: Untyped + def __init__(self): ... + def fileContents(self, fn, *args, **kwargs) -> Untyped: ... + def runCmd(self, command, with_stdout: bool = False, with_stderr: bool = False, inputtext: Untyped | None = None, **kwargs) -> Untyped: ... + def clearCache(self): ... diff --git a/python3/stubs/xcp/compat.pyi b/python3/stubs/xcp/compat.pyi new file mode 100644 index 00000000000..bd2c7cfa4a6 --- /dev/null +++ b/python3/stubs/xcp/compat.pyi @@ -0,0 +1,9 @@ +from basedtyping import Untyped +from typing import Any, IO + +def open_textfile(filename: str, mode: str, encoding: str = 'utf-8', **kwargs: Any) -> IO[str]: ... + +open_utf8: Untyped + +def open_with_codec_handling(filename: str, mode: str = 'r', encoding: str = 'utf-8', **kwargs: Any) -> IO[Any]: ... +def open_defaults_for_utf8_text(args: tuple[Any, ...] | None, kwargs: Any) -> tuple[str, Any]: ...