From e4b84f9d22c19730c9b8580c08e844193a487b3f Mon Sep 17 00:00:00 2001 From: SergioSim Date: Wed, 15 Nov 2023 16:19:24 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=8F=97=EF=B8=8F(backends)=20migrate=20LRS?= =?UTF-8?q?HTTPBackend=20to=20LRSDataBackend?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The interface of the http backends are now very similar to the data backends. Thus, to limit redundant interfaces we opt to move the LRSHTTPBackend and AsyncLRSHTTPBackend to the data backend package. We also update the cli write command to align more with the data backend interface. Moreover, the LRSDataBackend no longer inherits from the AsyncLRSDataBackend to alow it's usage in an async context. Finally, the behavior of the sync and async versions of the lrs data backend are very similar. Thus, to keep both implementations inline, we tests both of them in the same file. --- .env.dist | 14 +- docs/backends/index.md | 22 +- src/ralph/backends/data/async_lrs.py | 295 +++++++ src/ralph/backends/data/fs.py | 5 +- src/ralph/backends/data/lrs.py | 319 ++++++++ src/ralph/backends/http/async_lrs.py | 392 --------- src/ralph/backends/http/base.py | 151 ---- src/ralph/backends/http/lrs.py | 71 -- src/ralph/backends/loader.py | 5 +- src/ralph/cli.py | 116 +-- tests/backends/data/test_async_lrs.py | 766 +++++++++++++++++ tests/backends/data/test_base.py | 6 +- tests/backends/http/__init__.py | 0 tests/backends/http/test_async_lrs.py | 1083 ------------------------- tests/backends/http/test_base.py | 33 - tests/backends/http/test_lrs.py | 167 ---- tests/backends/test_loader.py | 12 +- tests/conftest.py | 2 + tests/fixtures/backends.py | 73 +- tests/test_cli.py | 4 +- tests/test_cli_usage.py | 40 +- tests/test_dependencies.py | 12 +- 22 files changed, 1550 insertions(+), 2038 deletions(-) create mode 100644 src/ralph/backends/data/async_lrs.py create mode 100644 src/ralph/backends/data/lrs.py delete mode 100644 src/ralph/backends/http/async_lrs.py delete mode 100644 src/ralph/backends/http/base.py delete mode 100644 src/ralph/backends/http/lrs.py create mode 100644 tests/backends/data/test_async_lrs.py delete mode 100644 tests/backends/http/__init__.py delete mode 100644 tests/backends/http/test_async_lrs.py delete mode 100644 tests/backends/http/test_base.py delete mode 100644 tests/backends/http/test_lrs.py diff --git a/.env.dist b/.env.dist index 80721d719..8a326f539 100644 --- a/.env.dist +++ b/.env.dist @@ -136,13 +136,13 @@ RALPH_BACKENDS__DATA__CLICKHOUSE__TEST_TABLE_NAME=test_xapi_events_all # LRS HTTP backend -RALPH_BACKENDS__HTTP__LRS__BASE_URL=http://ralph:secret@0.0.0.0:8100/ -RALPH_BACKENDS__HTTP__LRS__USERNAME=ralph -RALPH_BACKENDS__HTTP__LRS__PASSWORD=secret -RALPH_BACKENDS__HTTP__LRS__HEADERS__X_EXPERIENCE_API_VERSION=1.0.3 -RALPH_BACKENDS__HTTP__LRS__HEADERS__CONTENT_TYPE=application/json -RALPH_BACKENDS__HTTP__LRS__STATUS_ENDPOINT=/__heartbeat__ -RALPH_BACKENDS__HTTP__LRS__STATEMENTS_ENDPOINT=/xAPI/statements +RALPH_BACKENDS__DATA__LRS__BASE_URL=http://ralph:secret@0.0.0.0:8100/ +RALPH_BACKENDS__DATA__LRS__USERNAME=ralph +RALPH_BACKENDS__DATA__LRS__PASSWORD=secret +RALPH_BACKENDS__DATA__LRS__HEADERS__X_EXPERIENCE_API_VERSION=1.0.3 +RALPH_BACKENDS__DATA__LRS__HEADERS__CONTENT_TYPE=application/json +RALPH_BACKENDS__DATA__LRS__STATUS_ENDPOINT=/__heartbeat__ +RALPH_BACKENDS__DATA__LRS__STATEMENTS_ENDPOINT=/xAPI/statements # Sentry diff --git a/docs/backends/index.md b/docs/backends/index.md index 27f615241..0fabc5a4a 100644 --- a/docs/backends/index.md +++ b/docs/backends/index.md @@ -28,7 +28,7 @@ ralph list --backend swift # [...] more options The general patterns for backend parameters are: - `--{{ backend_name }}-{{ parameter | underscore_to_dash }}` for command options, and, -- `RALPH_BACKENDS__{{ backend_type | uppercase }}__{{ backend_name | uppercase }}__{{ parameter | uppercase }}` for environment variables, where the `backend_type` is one of `DATA`, `LRS` and `STREAM`. +- `RALPH_BACKENDS__DATA__{{ backend_name | uppercase }}__{{ parameter | uppercase }}` for environment variables. ## Elasticsearch @@ -69,6 +69,11 @@ documents from it. members: - attributes +The ClickHouse client options supported in Ralph can be found in these locations: + +- [Python driver specific](https://clickhouse.com/docs/en/integrations/language-clients/python/driver-api#settings-argument) +- [General ClickHouse client settings](https://clickhouse.com/docs/en/operations/settings/settings/) + ## OVH - Log Data Platform (LDP) LDP is a nice service built by OVH on top of Graylog to follow, analyse and @@ -150,23 +155,12 @@ The only required parameter is the `path` we want to list or stream content from members: - attributes -The ClickHouse client options supported in Ralph can be found in these locations: - -- [Python driver specific](https://clickhouse.com/docs/en/integrations/language-clients/python/driver-api#settings-argument) -- [General ClickHouse client settings](https://clickhouse.com/docs/en/operations/settings/settings/) - -## Learning Record Store (LRS) - HTTP backend interface - -!!! warning - - `HTTP` backend type will soon be merged into the `Data` backend type. - - PR [#521](https://github.com/openfun/ralph/pull/521) prepares `Data` backend alignments with `HTTP` backend `read` method signature +## Learning Record Store (LRS) The LRS backend is used to store and retrieve xAPI statements from various systems that follow the [xAPI specification](https://github.com/adlnet/xAPI-Spec/tree/master) (such as our own Ralph LRS, which can be run from this package). LRS systems are mostly used in e-learning infrastructures. -### ::: ralph.backends.http.async_lrs.LRSHTTPBackendSettings +### ::: ralph.backends.data.lrs.LRSDataBackendSettings handler: python options: show_root_heading: false diff --git a/src/ralph/backends/data/async_lrs.py b/src/ralph/backends/data/async_lrs.py new file mode 100644 index 000000000..8c8102144 --- /dev/null +++ b/src/ralph/backends/data/async_lrs.py @@ -0,0 +1,295 @@ +"""Async LRS data backend for Ralph.""" + +from io import IOBase +from typing import AsyncIterator, Iterable, Optional, Union +from urllib.parse import ParseResult, parse_qs, urljoin, urlparse + +from httpx import AsyncClient, HTTPError, HTTPStatusError, RequestError +from pydantic import AnyHttpUrl, PositiveInt, parse_obj_as + +from ralph.backends.data.base import ( + AsyncWritable, + BaseAsyncDataBackend, + BaseOperationType, + DataBackendStatus, + Loggable, +) +from ralph.backends.data.lrs import LRSDataBackendSettings, StatementResponse +from ralph.backends.lrs.base import LRSStatementsQuery +from ralph.exceptions import BackendException +from ralph.utils import async_parse_dict_to_bytes, iter_by_batch, parse_iterable_to_dict + + +class AsyncLRSDataBackend( + BaseAsyncDataBackend[LRSDataBackendSettings, LRSStatementsQuery], + AsyncWritable, + Loggable, +): + """Asynchronous LRS data backend.""" + + name = "async_lrs" + default_operation_type = BaseOperationType.CREATE + unsupported_operation_types = { + BaseOperationType.APPEND, + BaseOperationType.UPDATE, + BaseOperationType.DELETE, + } + + def __init__(self, settings: Optional[LRSDataBackendSettings] = None) -> None: + """Instantiate the LRS HTTP (basic auth) backend client. + + Args: + settings (LRSDataBackendSettings or None): The LRS data backend settings. + If `settings` is `None`, a default settings instance is used instead. + """ + super().__init__(settings) + self.base_url = parse_obj_as(AnyHttpUrl, self.settings.BASE_URL) + self.auth = (self.settings.USERNAME, self.settings.PASSWORD) + self._client = None + + @property + def client(self) -> AsyncClient: + """Create a `httpx.AsyncClient` if it doesn't exist.""" + if not self._client: + headers = self.settings.HEADERS.dict(by_alias=True) + self._client = AsyncClient(auth=self.auth, headers=headers) + return self._client + + async def status(self) -> DataBackendStatus: + """HTTP backend check for server status.""" + status_url = urljoin(self.base_url, self.settings.STATUS_ENDPOINT) + try: + response = await self.client.get(status_url) + response.raise_for_status() + except RequestError: + self.logger.error("Unable to request the server") + return DataBackendStatus.AWAY + except HTTPStatusError: + self.logger.error("Response raised an HTTP status of 4xx or 5xx") + return DataBackendStatus.ERROR + + return DataBackendStatus.OK + + async def read( # noqa: PLR0913 + self, + query: Optional[Union[str, LRSStatementsQuery]] = None, + target: Optional[str] = None, + chunk_size: Optional[int] = None, + raw_output: bool = False, + ignore_errors: bool = False, + prefetch: Optional[PositiveInt] = None, + max_statements: Optional[PositiveInt] = None, + ) -> Union[AsyncIterator[bytes], AsyncIterator[dict]]: + """Get statements from LRS `target` endpoint. + + The `read` method defined in the LRS specification returns `statements` array + and `more` IRL. The latter is required when the returned `statement` list has + been limited. + + Args: + query (str, LRSStatementsQuery): The query to select records to read. + target (str): Endpoint from which to read data (e.g. `/statements`). + If target is `None`, `/xAPI/statements` default endpoint is used. + chunk_size (int or None): The number of records or bytes to read in one + batch, depending on whether the records are dictionaries or bytes. + raw_output (bool): Controls whether to yield bytes or dictionaries. + If the records are dictionaries and `raw_output` is set to `True`, they + are encoded as JSON. + If the records are bytes and `raw_output` is set to `False`, they are + decoded as JSON by line. + ignore_errors (bool): If `True`, errors during the read operation + are ignored and logged. If `False` (default), a `BackendException` + is raised if an error occurs. + prefetch (int): The number of records to prefetch (queue) while yielding. + If `prefetch` is `None` it defaults to `1` - no records are prefetched. + max_statements: The maximum number of statements to yield. + """ + statements = super().read( + query, + target, + chunk_size, + raw_output, + ignore_errors, + prefetch, + max_statements, + ) + async for statement in statements: + yield statement + + async def _read_bytes( + self, + query: LRSStatementsQuery, + target: Optional[str], + chunk_size: int, + ignore_errors: bool, + ) -> AsyncIterator[bytes]: + """Method called by `self.read` yielding bytes. See `self.read`.""" + statements = self._read_dicts(query, target, chunk_size, ignore_errors) + async for statement in async_parse_dict_to_bytes( + statements, self.settings.LOCALE_ENCODING, ignore_errors, self.logger + ): + yield statement + + async def _read_dicts( + self, + query: LRSStatementsQuery, + target: Optional[str], + chunk_size: int, + ignore_errors: bool, # noqa: ARG002 + ) -> AsyncIterator[dict]: + """Method called by `self.read` yielding dictionaries. See `self.read`.""" + if not target: + target = self.settings.STATEMENTS_ENDPOINT + + if query.limit: + self.logger.warning( + "The limit query parameter value is overwritten by the chunk_size " + "parameter value." + ) + + query.limit = chunk_size + + # Create request URL + target = ParseResult( + scheme=urlparse(self.base_url).scheme, + netloc=urlparse(self.base_url).netloc, + path=target, + query="", + params="", + fragment="", + ).geturl() + + statements = self._fetch_statements( + target=target, + query_params=query.dict(exclude_none=True, exclude_unset=True), + ) + + # Iterate through results + try: + async for statement in statements: + yield statement + except HTTPError as error: + msg = "Failed to fetch statements: %s" + self.logger.error(msg, error) + raise BackendException(msg % (error,)) from error + + async def write( # noqa: PLR0913 + self, + data: Union[IOBase, Iterable[bytes], Iterable[dict]], + target: Optional[str] = None, + chunk_size: Optional[int] = None, + ignore_errors: bool = False, + operation_type: Optional[BaseOperationType] = None, + concurrency: Optional[int] = None, + ) -> int: + """Write `data` records to the `target` endpoint and return their count. + + Args: + data: (Iterable): The data to write. + target (str or None): Endpoint in which to write data (e.g. `/statements`). + If `target` is `None`, `/xAPI/statements` default endpoint is used. + chunk_size (int or None): The number of records or bytes to write in one + batch, depending on whether `data` contains dictionaries or bytes. + If `chunk_size` is `None`, a default value is used instead. + ignore_errors (bool): If `True`, errors during the write operation + are ignored and logged. If `False` (default), a `BackendException` + is raised if an error occurs. + operation_type (BaseOperationType or None): The mode of the write operation. + If `operation_type` is `None`, the `default_operation_type` is used + instead. See `BaseOperationType`. + concurrency (int): The number of chunks to write concurrently. + If `None` it defaults to `1`. + """ + return await super().write( + data, target, chunk_size, ignore_errors, operation_type, concurrency + ) + + async def _write_bytes( # noqa: PLR0913 + self, + data: Iterable[bytes], + target: Optional[str], + chunk_size: int, + ignore_errors: bool, + operation_type: BaseOperationType, + ) -> int: + """Method called by `self.write` writing bytes. See `self.write`.""" + statements = parse_iterable_to_dict(data, ignore_errors, self.logger) + return await self._write_dicts( + statements, target, chunk_size, ignore_errors, operation_type + ) + + async def _write_dicts( # noqa: PLR0913 + self, + data: Iterable[dict], + target: Optional[str], + chunk_size: int, + ignore_errors: bool, + operation_type: BaseOperationType, # noqa: ARG002 + ) -> int: + """Method called by `self.write` writing dictionaries. See `self.write`.""" + if not target: + target = self.settings.STATEMENTS_ENDPOINT + + target = ParseResult( + scheme=urlparse(self.base_url).scheme, + netloc=urlparse(self.base_url).netloc, + path=target, + query="", + params="", + fragment="", + ).geturl() + + self.logger.debug( + "Start writing to the %s endpoint (chunk size: %s)", target, chunk_size + ) + + count = 0 + for chunk in iter_by_batch(data, chunk_size): + count += await self._post_and_raise_for_status(target, chunk, ignore_errors) + + self.logger.debug("Posted %d statements", count) + return count + + async def close(self) -> None: + """Close the `httpx.AsyncClient`. + + Raise: + BackendException: If a failure occurs during the close operation. + """ + if not self._client: + self.logger.warning("No backend client to close.") + return + + await self.client.aclose() + + async def _fetch_statements(self, target, query_params: dict): + """Fetch statements from a LRS.""" + while True: + response = await self.client.get(target, params=query_params) + response.raise_for_status() + statements_response = StatementResponse(**response.json()) + statements = statements_response.statements + if isinstance(statements, dict): + statements = [statements] + + for statement in statements: + yield statement + + if not statements_response.more: + break + + query_params.update(parse_qs(urlparse(statements_response.more).query)) + + async def _post_and_raise_for_status(self, target, chunk, ignore_errors): + """POST chunk of statements to `target` and return the number of insertions.""" + try: + request = await self.client.post(target, json=chunk) + request.raise_for_status() + return len(chunk) + except HTTPError as error: + msg = "Failed to post statements: %s" + if ignore_errors: + self.logger.warning(msg, error) + return 0 + self.logger.error(msg, error) + raise BackendException(msg % (error,)) from error diff --git a/src/ralph/backends/data/fs.py b/src/ralph/backends/data/fs.py index 228ec0e5f..f8db75979 100644 --- a/src/ralph/backends/data/fs.py +++ b/src/ralph/backends/data/fs.py @@ -321,8 +321,8 @@ def _write_bytes( # noqa: PLR0913 msg = "Target file not specified; using random file name: %s" self.logger.info(msg, target) - target = Path(target) - path = target if target.is_absolute() else self.default_directory / target + path = Path(target) + path = path if path.is_absolute() else self.default_directory / path if operation_type in [BaseOperationType.CREATE, BaseOperationType.INDEX]: if path.is_file(): @@ -363,6 +363,7 @@ def _write_bytes( # noqa: PLR0913 "timestamp": now(), } ) + self.logger.debug("Written %s with success", path.absolute()) return 1 def close(self) -> None: diff --git a/src/ralph/backends/data/lrs.py b/src/ralph/backends/data/lrs.py new file mode 100644 index 000000000..479321692 --- /dev/null +++ b/src/ralph/backends/data/lrs.py @@ -0,0 +1,319 @@ +"""LRS data backend for Ralph.""" + +from io import IOBase +from typing import Iterable, Iterator, List, Optional, Union +from urllib.parse import ParseResult, parse_qs, urljoin, urlparse + +from httpx import Client, HTTPError, HTTPStatusError, RequestError +from pydantic import AnyHttpUrl, BaseModel, Field, PositiveInt, parse_obj_as + +from ralph.backends.data.base import ( + BaseDataBackend, + BaseDataBackendSettings, + BaseOperationType, + DataBackendStatus, + Loggable, + Writable, +) +from ralph.backends.lrs.base import LRSStatementsQuery +from ralph.conf import BaseSettingsConfig, HeadersParameters +from ralph.exceptions import BackendException +from ralph.utils import iter_by_batch, parse_dict_to_bytes, parse_iterable_to_dict + + +class LRSHeaders(HeadersParameters): + """Pydantic model for LRS headers.""" + + X_EXPERIENCE_API_VERSION: str = Field("1.0.3", alias="X-Experience-API-Version") + CONTENT_TYPE: str = Field("application/json", alias="content-type") + + +class LRSDataBackendSettings(BaseDataBackendSettings): + """LRS data backend default configuration. + + Attributes: + BASE_URL (AnyHttpUrl): LRS server URL. + USERNAME (str): Basic auth username for LRS authentication. + PASSWORD (str): Basic auth password for LRS authentication. + HEADERS (dict): Headers defined for the LRS server connection. + LOCALE_ENCODING (str): The encoding used for reading statements. + READ_CHUNK_SIZE (int): The default chunk size for reading statements. + STATUS_ENDPOINT (str): Endpoint used to check server status. + STATEMENTS_ENDPOINT (str): Default endpoint for LRS statements resource. + WRITE_CHUNK_SIZE (int): The default chunk size for writing statements. + """ + + class Config(BaseSettingsConfig): + """Pydantic Configuration.""" + + env_prefix = "RALPH_BACKENDS__DATA__LRS__" + + BASE_URL: AnyHttpUrl = Field("http://0.0.0.0:8100") + USERNAME: str = "ralph" + PASSWORD: str = "secret" + HEADERS: LRSHeaders = LRSHeaders() + STATUS_ENDPOINT: str = "/__heartbeat__" + STATEMENTS_ENDPOINT: str = "/xAPI/statements" + + +class StatementResponse(BaseModel): + """Pydantic model for `get` statements response.""" + + statements: Union[List[dict], dict] + more: Optional[str] + + +class LRSDataBackend( + BaseDataBackend[LRSDataBackendSettings, LRSStatementsQuery], Writable, Loggable +): + """LRS data backend.""" + + name = "lrs" + default_operation_type = BaseOperationType.CREATE + unsupported_operation_types = { + BaseOperationType.APPEND, + BaseOperationType.UPDATE, + BaseOperationType.DELETE, + } + + def __init__(self, settings: Optional[LRSDataBackendSettings] = None) -> None: + """Instantiate the LRS HTTP (basic auth) backend client. + + Args: + settings (LRSDataBackendSettings or None): The LRS data backend settings. + If `settings` is `None`, a default settings instance is used instead. + """ + super().__init__(settings) + self.base_url = parse_obj_as(AnyHttpUrl, self.settings.BASE_URL) + self.auth = (self.settings.USERNAME, self.settings.PASSWORD) + self._client = None + + @property + def client(self) -> Client: + """Create a `httpx.Client` if it doesn't exist.""" + if not self._client: + headers = self.settings.HEADERS.dict(by_alias=True) + self._client = Client(auth=self.auth, headers=headers) + return self._client + + def status(self) -> DataBackendStatus: + """HTTP backend check for server status.""" + status_url = urljoin(self.base_url, self.settings.STATUS_ENDPOINT) + try: + response = self.client.get(status_url) + response.raise_for_status() + except RequestError: + self.logger.error("Unable to request the server") + return DataBackendStatus.AWAY + except HTTPStatusError: + self.logger.error("Response raised an HTTP status of 4xx or 5xx") + return DataBackendStatus.ERROR + + return DataBackendStatus.OK + + def read( # noqa: PLR0913 + self, + query: Optional[Union[str, LRSStatementsQuery]] = None, + target: Optional[str] = None, + chunk_size: Optional[int] = None, + raw_output: bool = False, + ignore_errors: bool = False, + max_statements: Optional[PositiveInt] = None, + ) -> Union[Iterator[bytes], Iterator[dict]]: + """Get statements from LRS `target` endpoint. + + The `read` method defined in the LRS specification returns `statements` array + and `more` IRL. The latter is required when the returned `statement` list has + been limited. + + Args: + query (str, LRSStatementsQuery): The query to select records to read. + target (str): Endpoint from which to read data (e.g. `/statements`). + If target is `None`, `/xAPI/statements` default endpoint is used. + chunk_size (int or None): The number of records or bytes to read in one + batch, depending on whether the records are dictionaries or bytes. + raw_output (bool): Controls whether to yield bytes or dictionaries. + If the records are dictionaries and `raw_output` is set to `True`, they + are encoded as JSON. + If the records are bytes and `raw_output` is set to `False`, they are + decoded as JSON by line. + ignore_errors (bool): If `True`, errors during the read operation + are ignored and logged. If `False` (default), a `BackendException` + is raised if an error occurs. + max_statements: The maximum number of statements to yield. + """ + yield from super().read( + query, target, chunk_size, raw_output, ignore_errors, max_statements + ) + + def _read_bytes( + self, + query: LRSStatementsQuery, + target: Optional[str], + chunk_size: int, + ignore_errors: bool, + ) -> Iterator[bytes]: + """Method called by `self.read` yielding bytes. See `self.read`.""" + statements = self._read_dicts(query, target, chunk_size, ignore_errors) + yield from parse_dict_to_bytes( + statements, self.settings.LOCALE_ENCODING, ignore_errors, self.logger + ) + + def _read_dicts( + self, + query: LRSStatementsQuery, + target: Optional[str], + chunk_size: int, + ignore_errors: bool, # noqa: ARG002 + ) -> Iterator[dict]: + """Method called by `self.read` yielding dictionaries. See `self.read`.""" + if not target: + target = self.settings.STATEMENTS_ENDPOINT + + if query.limit: + self.logger.warning( + "The limit query parameter value is overwritten by the chunk_size " + "parameter value." + ) + + query.limit = chunk_size + + # Create request URL + target = ParseResult( + scheme=urlparse(self.base_url).scheme, + netloc=urlparse(self.base_url).netloc, + path=target, + query="", + params="", + fragment="", + ).geturl() + + statements = self._fetch_statements( + target=target, + query_params=query.dict(exclude_none=True, exclude_unset=True), + ) + + # Iterate through results + try: + for statement in statements: + yield statement + except HTTPError as error: + msg = "Failed to fetch statements: %s" + self.logger.error(msg, error) + raise BackendException(msg % (error,)) from error + + def write( # noqa: PLR0913 + self, + data: Union[IOBase, Iterable[bytes], Iterable[dict]], + target: Optional[str] = None, + chunk_size: Optional[int] = None, + ignore_errors: bool = False, + operation_type: Optional[BaseOperationType] = None, + ) -> int: + """Write `data` records to the `target` endpoint and return their count. + + Args: + data: (Iterable): The data to write. + target (str or None): Endpoint in which to write data (e.g. `/statements`). + If `target` is `None`, `/xAPI/statements` default endpoint is used. + chunk_size (int or None): The number of records or bytes to write in one + batch, depending on whether `data` contains dictionaries or bytes. + If `chunk_size` is `None`, a default value is used instead. + ignore_errors (bool): If `True`, errors during the write operation + are ignored and logged. If `False` (default), a `BackendException` + is raised if an error occurs. + operation_type (BaseOperationType or None): The mode of the write operation. + If `operation_type` is `None`, the `default_operation_type` is used + instead. See `BaseOperationType`. + """ + return super().write(data, target, chunk_size, ignore_errors, operation_type) + + def _write_bytes( # noqa: PLR0913 + self, + data: Iterable[bytes], + target: Optional[str], + chunk_size: int, + ignore_errors: bool, + operation_type: BaseOperationType, + ) -> int: + """Method called by `self.write` writing bytes. See `self.write`.""" + statements = parse_iterable_to_dict(data, ignore_errors, self.logger) + return self._write_dicts( + statements, target, chunk_size, ignore_errors, operation_type + ) + + def _write_dicts( # noqa: PLR0913 + self, + data: Iterable[dict], + target: Optional[str], + chunk_size: int, + ignore_errors: bool, + operation_type: BaseOperationType, # noqa: ARG002 + ) -> int: + """Method called by `self.write` writing dictionaries. See `self.write`.""" + if not target: + target = self.settings.STATEMENTS_ENDPOINT + + target = ParseResult( + scheme=urlparse(self.base_url).scheme, + netloc=urlparse(self.base_url).netloc, + path=target, + query="", + params="", + fragment="", + ).geturl() + + self.logger.debug( + "Start writing to the %s endpoint (chunk size: %s)", target, chunk_size + ) + + count = 0 + for chunk in iter_by_batch(data, chunk_size): + count += self._post_and_raise_for_status(target, chunk, ignore_errors) + + self.logger.debug("Posted %d statements", count) + return count + + def close(self) -> None: + """Close the `httpx.Client`. + + Raise: + BackendException: If a failure occurs during the close operation. + """ + if not self._client: + self.logger.warning("No backend client to close.") + return + + self.client.close() + + def _fetch_statements(self, target, query_params: dict): + """Fetch statements from a LRS.""" + while True: + response = self.client.get(target, params=query_params) + response.raise_for_status() + statements_response = StatementResponse(**response.json()) + statements = statements_response.statements + if isinstance(statements, dict): + statements = [statements] + + for statement in statements: + yield statement + + if not statements_response.more: + break + + query_params.update(parse_qs(urlparse(statements_response.more).query)) + + def _post_and_raise_for_status(self, target, chunk, ignore_errors): + """POST chunk of statements to `target` and return the number of insertions.""" + try: + request = self.client.post(target, content=chunk) + request.raise_for_status() + return len(chunk) + except HTTPError as error: + msg = "Failed to post statements: %s" + if ignore_errors: + self.logger.warning(msg, error) + return 0 + self.logger.error(msg, error) + raise BackendException(msg % (error,)) from error diff --git a/src/ralph/backends/http/async_lrs.py b/src/ralph/backends/http/async_lrs.py deleted file mode 100644 index b94e7ad34..000000000 --- a/src/ralph/backends/http/async_lrs.py +++ /dev/null @@ -1,392 +0,0 @@ -"""Async LRS HTTP backend for Ralph.""" - -import asyncio -import json -import logging -from itertools import chain -from typing import Iterable, Iterator, List, Optional, Union -from urllib.parse import ParseResult, parse_qs, urljoin, urlparse - -from httpx import AsyncClient, HTTPError, HTTPStatusError, RequestError -from pydantic import AnyHttpUrl, BaseModel, Field, parse_obj_as -from pydantic.types import PositiveInt - -from ralph.backends.lrs.base import LRSStatementsQuery -from ralph.conf import BaseSettingsConfig, HeadersParameters -from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import gather_with_limited_concurrency, iter_by_batch - -from .base import ( - BaseHTTPBackend, - BaseHTTPBackendSettings, - HTTPBackendStatus, - OperationType, - enforce_query_checks, -) - -logger = logging.getLogger(__name__) - - -class LRSHeaders(HeadersParameters): - """Pydantic model for LRS headers.""" - - X_EXPERIENCE_API_VERSION: str = Field("1.0.3", alias="X-Experience-API-Version") - CONTENT_TYPE: str = Field("application/json", alias="content-type") - - -class LRSHTTPBackendSettings(BaseHTTPBackendSettings): - """LRS HTTP backend default configuration. - - Attributes: - BASE_URL (AnyHttpUrl): LRS server URL. - USERNAME (str): Basic auth username for LRS authentication. - PASSWORD (str): Basic auth password for LRS authentication. - HEADERS (dict): Headers defined for the LRS server connection. - STATUS_ENDPOINT (str): Endpoint used to check server status. - STATEMENTS_ENDPOINT (str): Default endpoint for LRS statements resource. - """ - - class Config(BaseSettingsConfig): - """Pydantic Configuration.""" - - env_prefix = "RALPH_BACKENDS__HTTP__LRS__" - - BASE_URL: AnyHttpUrl = Field("http://0.0.0.0:8100") - USERNAME: str = "ralph" - PASSWORD: str = "secret" - HEADERS: LRSHeaders = LRSHeaders() - STATUS_ENDPOINT: str = "/__heartbeat__" - STATEMENTS_ENDPOINT: str = "/xAPI/statements" - - -class StatementResponse(BaseModel): - """Pydantic model for `get` statements response.""" - - statements: Union[List[dict], dict] - more: Optional[str] - - -class AsyncLRSHTTPBackend(BaseHTTPBackend): - """Asynchronous LRS HTTP backend.""" - - name = "async_lrs" - query = LRSStatementsQuery - default_operation_type = OperationType.CREATE - settings_class = LRSHTTPBackendSettings - - def __init__(self, settings: Optional[LRSHTTPBackendSettings] = None): - """Instantiate the LRS HTTP (basic auth) backend client. - - Args: - settings (LRSHTTPBackendSettings or None): The LRS HTTP backend settings. - If `settings` is `None`, a default settings instance is used instead. - """ - self.settings = settings if settings else self.settings_class() - - self.base_url = parse_obj_as(AnyHttpUrl, self.settings.BASE_URL) - self.auth = (self.settings.USERNAME, self.settings.PASSWORD) - - async def status(self) -> HTTPBackendStatus: - """HTTP backend check for server status.""" - status_url = urljoin(self.base_url, self.settings.STATUS_ENDPOINT) - - try: - async with AsyncClient() as client: - response = await client.get(status_url) - response.raise_for_status() - except RequestError: - msg = "Unable to request the server" - logger.error(msg) - return HTTPBackendStatus.AWAY - except HTTPStatusError: - msg = "Response raised an HTTP status of 4xx or 5xx" - logger.error(msg) - return HTTPBackendStatus.ERROR - - return HTTPBackendStatus.OK - - async def list( - self, - target: Optional[str] = None, - details: bool = False, # noqa: ARG002 - new: bool = False, # noqa: ARG002 - ) -> Iterator[Union[str, dict]]: - """Raise error for unsupported `list` method.""" - msg = "LRS HTTP backend does not support `list` method, cannot list from %s" - - logger.error(msg, target) - raise NotImplementedError(msg % target) - - @enforce_query_checks - async def read( # noqa: PLR0913 - self, - query: Optional[Union[str, LRSStatementsQuery]] = None, - target: Optional[str] = None, - chunk_size: Optional[PositiveInt] = 500, - raw_output: bool = False, - ignore_errors: bool = False, # noqa: ARG002 - greedy: bool = True, - max_statements: Optional[PositiveInt] = None, - ) -> Iterator[Union[bytes, dict]]: - """Get statements from LRS `target` endpoint. - - The `read` method defined in the LRS specification returns `statements` array - and `more` IRL. The latter is required when the returned `statement` list has - been limited. - - Args: - query (str, LRSStatementsQuery): The query to select records to read. - target (str): Endpoint from which to read data (e.g. `/statements`). - If target is `None`, `/xAPI/statements` default endpoint is used. - chunk_size (int or None): The number of records or bytes to read in one - batch, depending on whether the records are dictionaries or bytes. - raw_output (bool): Controls whether to yield bytes or dictionaries. - If the records are dictionaries and `raw_output` is set to `True`, they - are encoded as JSON. - If the records are bytes and `raw_output` is set to `False`, they are - decoded as JSON by line. - ignore_errors (bool): If `True`, errors during the read operation - are ignored and logged. If `False` (default), a `BackendException` - is raised if an error occurs. - greedy: If set to True, the client will fetch all available pages even - before the statements yielded by the generator are consumed. Caution: - this might potentially lead to large amounts of API calls and to the - memory filling up. - max_statements: The maximum number of statements to yield. - """ - if not target: - target = self.settings.STATEMENTS_ENDPOINT - - if query and query.limit: - logger.warning( - "The limit query parameter value is overwritten by the chunk_size " - "parameter value." - ) - - if query is None: - query = LRSStatementsQuery() - - query.limit = chunk_size - - # Create request URL - target = ParseResult( - scheme=urlparse(self.base_url).scheme, - netloc=urlparse(self.base_url).netloc, - path=target, - query="", - params="", - fragment="", - ).geturl() - - # Select the appropriate fetch function - if greedy: - statements_async_generator = self._greedy_fetch_statements( - target, raw_output, query.dict(exclude_none=True, exclude_unset=True) - ) - else: - statements_async_generator = self._fetch_statements( - target=target, - raw_output=raw_output, - query_params=query.dict(exclude_none=True, exclude_unset=True), - ) - - # Iterate through results - counter = 0 - try: - async for statement in statements_async_generator: - if max_statements and (counter >= max_statements): - break - yield statement - counter += 1 - except HTTPError as error: - msg = "Failed to fetch statements." - logger.error("%s. %s", msg, error) - raise BackendException(msg, *error.args) from error - - async def write( # noqa: PLR0913 - self, - data: Union[Iterable[bytes], Iterable[dict]], - target: Optional[str] = None, - chunk_size: Optional[PositiveInt] = 500, - ignore_errors: bool = False, - operation_type: Optional[OperationType] = None, - simultaneous: bool = False, - max_num_simultaneous: Optional[PositiveInt] = None, - ) -> int: - """Write `data` records to the `target` endpoint and return their count. - - Args: - data: (Iterable): The data to write. - target (str or None): Endpoint in which to write data (e.g. `/statements`). - If `target` is `None`, `/xAPI/statements` default endpoint is used. - chunk_size (int or None): The number of records or bytes to write in one - batch, depending on whether `data` contains dictionaries or bytes. - If `chunk_size` is `None`, a default value is used instead. - ignore_errors (bool): If `True`, errors during the write operation - are ignored and logged. If `False` (default), a `BackendException` - is raised if an error occurs. - operation_type (OperationType or None): The mode of the write operation. - If `operation_type` is `None`, the `default_operation_type` is used - instead. See `OperationType`. - simultaneous (bool): If `True`, chunks requests will be made concurrently. - If `False` (default), chunks will be sent sequentially - max_num_simultaneous (int or None): If simultaneous is `True`, the maximum - number of chunks to POST concurrently. If `None` (default), no limit is - set. - """ - data = iter(data) - try: - first_record = next(data) - except StopIteration: - logger.info("Data Iterator is empty; skipping write to target") - return 0 - - if not operation_type: - operation_type = self.default_operation_type - - if operation_type in ( - OperationType.APPEND, - OperationType.UPDATE, - OperationType.DELETE, - ): - msg = f"{operation_type.value} operation type is not supported." - logger.error(msg) - raise BackendParameterException(msg) - - if not target: - target = self.settings.STATEMENTS_ENDPOINT - - if not chunk_size: - chunk_size = 500 - - target = ParseResult( - scheme=urlparse(self.base_url).scheme, - netloc=urlparse(self.base_url).netloc, - path=target, - query="", - params="", - fragment="", - ).geturl() - - if (max_num_simultaneous is not None) and max_num_simultaneous < 1: - msg = "max_num_simultaneous must be a strictly positive integer" - logger.error(msg) - raise BackendParameterException(msg) - - if (not simultaneous) and (max_num_simultaneous is not None): - msg = "max_num_simultaneous can only be used with `simultaneous=True`" - logger.error(msg) - raise BackendParameterException(msg) - - # Gather only one chunk at a time when not using the simultaneous option - if not simultaneous: - max_num_simultaneous = 1 - - data = chain((first_record,), data) - - logger.debug( - "Start writing to the %s endpoint (chunk size: %s)", target, chunk_size - ) - - # Create tasks - tasks = set() - for chunk in iter_by_batch(data, chunk_size): - tasks.add(self._post_and_raise_for_status(target, chunk, ignore_errors)) - - # Run POST tasks - result = await gather_with_limited_concurrency(max_num_simultaneous, *tasks) - statements_count = sum(result) - - logger.debug("Posted %d statements", statements_count) - return statements_count - - async def _fetch_statements(self, target, raw_output, query_params: dict): - """Fetch statements from a LRS. Used in `read`.""" - async with AsyncClient( - auth=self.auth, headers=self.settings.HEADERS.dict(by_alias=True) - ) as client: - while True: - response = await client.get(target, params=query_params) - response.raise_for_status() - statements_response = StatementResponse.parse_obj(response.json()) - statements = statements_response.statements - statements = ( - [statements] if not isinstance(statements, list) else statements - ) - if raw_output: - for statement in statements: - yield bytes(json.dumps(statement), encoding="utf-8") - else: - for statement in statements: - yield statement - - if not statements_response.more: - break - - query_params.update(parse_qs(urlparse(statements_response.more).query)) - - async def _greedy_fetch_statements(self, target, raw_output, query_params: dict): - """Fetch as many statements as possible and yield when they are available. - - This may be used in the context of paginated results, to allow processing - of statements while the following page is being fetched. Implementation of - this function relies on the class method `_fetch_statements`, which must yield - statements. - """ - queue = asyncio.Queue() - - async def fetch_all_statements(queue): - """Fetch all statements and put them in a queue.""" - try: - async for statement in self._fetch_statements( - target=target, raw_output=raw_output, query_params=query_params - ): - await queue.put(statement) - # Re-raising exceptions is necessary as create_task fails silently - except Exception as exception: - # None signals that the queue is done - await queue.put(None) - raise exception - await queue.put(None) - - # Run fetch_all_statements in the background - task = asyncio.create_task(fetch_all_statements(queue)) - - # Yield statements as they arrive - while True: - statement = await queue.get() - if statement is None: - # Check for exception in fetch_all_statements - if task.exception(): - raise task.exception() - break - yield statement - - async def _post_and_raise_for_status(self, target, chunk, ignore_errors): - """POST chunk of statements to `target` and return the number of insertions. - - For use in `write`. - """ - async with AsyncClient(auth=self.auth, headers=self.settings.HEADERS) as client: - try: - request = await client.post( - # Encode data to allow async post - target, - content=json.dumps(list(chunk)).encode("utf-8"), - ) - except TypeError as error: - msg = f"Failed to encode JSON: {error}, for document {chunk}" - logger.error(msg) - if ignore_errors: - return 0 - raise BackendException(msg) from error - - try: - request.raise_for_status() - return len(chunk) - except HTTPError as error: - msg = "Failed to post statements" - logger.error("%s. %s", msg, error) - if ignore_errors: - return 0 - raise BackendException(msg, *error.args) from error diff --git a/src/ralph/backends/http/base.py b/src/ralph/backends/http/base.py deleted file mode 100644 index 4f3b212ec..000000000 --- a/src/ralph/backends/http/base.py +++ /dev/null @@ -1,151 +0,0 @@ -"""Base HTTP backend for Ralph.""" - -import functools -import logging -from abc import ABC, abstractmethod -from enum import Enum, unique -from typing import Iterator, List, Optional, Union - -from pydantic import BaseModel, BaseSettings, ValidationError -from pydantic.types import PositiveInt - -from ralph.conf import BaseSettingsConfig, core_settings -from ralph.exceptions import BackendParameterException - -logger = logging.getLogger(__name__) - - -class BaseHTTPBackendSettings(BaseSettings): - """Data backend default configuration.""" - - class Config(BaseSettingsConfig): - """Pydantic Configuration.""" - - env_prefix = "RALPH_BACKENDS__HTTP__" - env_file = ".env" - env_file_encoding = core_settings.LOCALE_ENCODING - - -@unique -class HTTPBackendStatus(Enum): - """HTTP backend statuses.""" - - OK = "ok" - AWAY = "away" - ERROR = "error" - - -class OperationType(Enum): - """Data backend operation types. - - Attributes: - INDEX (str): creates a new record with a specific ID. - CREATE (str): creates a new record without a specific ID. - DELETE (str): deletes an existing record. - UPDATE (str): updates or overwrites an existing record. - APPEND (str): creates or appends data to an existing record. - """ - - INDEX = "index" - CREATE = "create" - DELETE = "delete" - UPDATE = "update" - APPEND = "append" - - -def enforce_query_checks(method): - """Enforce query argument type checking for methods using it.""" - - @functools.wraps(method) - def wrapper(*args, **kwargs): - """Wrap method execution.""" - query = kwargs.pop("query", None) - self_ = args[0] - - return method(*args, query=self_.validate_query(query), **kwargs) - - return wrapper - - -class BaseQuery(BaseModel): - """Base query model.""" - - class Config: - """Base query model configuration.""" - - extra = "forbid" - - query_string: Optional[str] - - -class BaseHTTPBackend(ABC): - """Base HTTP backend interface.""" - - name = "base" - query = BaseQuery - - def validate_query( - self, query: Optional[Union[str, dict, BaseQuery]] = None - ) -> BaseQuery: - """Validate and transforms the query.""" - if query is None: - query = self.query() - - if isinstance(query, str): - query = self.query(query_string=query) - - if isinstance(query, dict): - try: - query = self.query(**query) - except ValidationError as err: - raise BackendParameterException( - "The 'query' argument is expected to be a " - f"{self.query.__name__} instance. {err.errors()}" - ) from err - - if not isinstance(query, self.query): - raise BackendParameterException( - "The 'query' argument is expected to be a " - f"{self.query.__name__} instance." - ) - - logger.debug("Query: %s", str(query)) - - return query - - @abstractmethod - async def list( - self, target: Optional[str] = None, details: bool = False, new: bool = False - ) -> Iterator[Union[str, dict]]: - """List containers in the data backend. E.g., collections, files, indexes.""" - - @abstractmethod - async def status(self) -> HTTPBackendStatus: - """Implement HTTP backend check for server status.""" - - @abstractmethod - @enforce_query_checks - async def read( # noqa: PLR0913 - self, - query: Optional[Union[str, BaseQuery]] = None, - target: Optional[str] = None, - chunk_size: Optional[PositiveInt] = 500, - raw_output: bool = False, - ignore_errors: bool = False, - greedy: bool = False, - max_statements: Optional[PositiveInt] = None, - ) -> Iterator[Union[bytes, dict]]: - """Yield records read from the HTTP response results.""" - - @abstractmethod - async def write( # noqa: PLR0913 - self, - data: Union[List[bytes], List[dict]], - target: Optional[str] = None, - chunk_size: Optional[PositiveInt] = 500, - ignore_errors: bool = False, - operation_type: Optional[OperationType] = None, - simultaneous: bool = False, - max_num_simultaneous: Optional[int] = None, - ) -> int: - """Write statements into the HTTP server given an input endpoint.""" diff --git a/src/ralph/backends/http/lrs.py b/src/ralph/backends/http/lrs.py deleted file mode 100644 index 1049f08fa..000000000 --- a/src/ralph/backends/http/lrs.py +++ /dev/null @@ -1,71 +0,0 @@ -"""LRS HTTP backend for Ralph.""" -import asyncio -from typing import Iterator, Union - -from ralph.backends.http.async_lrs import AsyncLRSHTTPBackend -from ralph.backends.http.base import HTTPBackendStatus - - -def _ensure_running_loop_uniqueness(func): - """Raise an error when methods are used in a running asyncio events loop.""" - - def wrap(*args, **kwargs): - """Wrapper for decorator function.""" - try: - loop = asyncio.get_running_loop() - except RuntimeError: - return func(*args, **kwargs) - if loop.is_running(): - raise RuntimeError( - f"This event loop is already running. You must use " - f"`AsyncLRSHTTPBackend.{func.__name__}` (instead of `LRSHTTPBackend." - f"{func.__name__}`), or run this code outside the current" - " event loop." - ) - return func(*args, **kwargs) - - return wrap - - -class LRSHTTPBackend(AsyncLRSHTTPBackend): - """LRS HTTP backend.""" - - name = "lrs" - - @_ensure_running_loop_uniqueness - def status(self, *args, **kwargs) -> HTTPBackendStatus: - """HTTP backend check for server status.""" - return asyncio.get_event_loop().run_until_complete( - super().status(*args, **kwargs) - ) - - @_ensure_running_loop_uniqueness - def list(self, *args, **kwargs) -> Iterator[Union[str, dict]]: - """Raise error for unsupported `list` method.""" - return asyncio.get_event_loop().run_until_complete( - super().list(*args, **kwargs) - ) - - @_ensure_running_loop_uniqueness - def read(self, *args, **kwargs) -> Iterator[Union[bytes, dict]]: - """Get statements from LRS `target` endpoint. - - See AsyncLRSHTTP.read for more information. - """ - async_statements_gen = super().read(*args, **kwargs) - loop = asyncio.get_event_loop() - try: - while True: - yield loop.run_until_complete(async_statements_gen.__anext__()) - except StopAsyncIteration: - pass - - @_ensure_running_loop_uniqueness - def write(self, *args, **kwargs) -> int: - """Write `data` records to the `target` endpoint and return their count. - - See AsyncLRSHTTP.write for more information. - """ - return asyncio.get_event_loop().run_until_complete( - super().write(*args, **kwargs) - ) diff --git a/src/ralph/backends/loader.py b/src/ralph/backends/loader.py index cda3c2ac6..62e935076 100644 --- a/src/ralph/backends/loader.py +++ b/src/ralph/backends/loader.py @@ -16,7 +16,6 @@ Listable, Writable, ) -from ralph.backends.http.base import BaseHTTPBackend from ralph.backends.lrs.base import BaseAsyncLRSBackend, BaseLRSBackend from ralph.backends.stream.base import BaseStreamBackend @@ -78,13 +77,11 @@ def get_cli_backends() -> Dict[str, Type]: """Return Ralph's backend classes for cli usage.""" dotted_paths = ( "ralph.backends.data", - "ralph.backends.http", "ralph.backends.stream", ) base_backends = ( BaseAsyncDataBackend, BaseDataBackend, - BaseHTTPBackend, BaseStreamBackend, ) return get_backends(dotted_paths, base_backends) @@ -97,7 +94,7 @@ def get_cli_write_backends() -> Dict[str, Type]: return { name: backend for name, backend in backends.items() - if issubclass(backend, (Writable, AsyncWritable, BaseHTTPBackend)) + if issubclass(backend, (Writable, AsyncWritable)) } diff --git a/src/ralph/cli.py b/src/ralph/cli.py index c26e3beef..bd0674c22 100644 --- a/src/ralph/cli.py +++ b/src/ralph/cli.py @@ -4,7 +4,7 @@ import logging import re import sys -from inspect import isasyncgen, isclass, iscoroutinefunction +from inspect import isasyncgen, isclass from pathlib import Path from tempfile import NamedTemporaryFile from typing import Any, Callable, Dict, Optional, Type, Union @@ -29,11 +29,11 @@ from ralph import __version__ as ralph_version from ralph.backends.data.base import ( + AsyncWritable, BaseAsyncDataBackend, BaseDataBackend, BaseOperationType, ) -from ralph.backends.http.base import BaseHTTPBackend from ralph.backends.loader import ( get_cli_backends, get_cli_list_backends, @@ -609,7 +609,7 @@ def convert(from_, to_, ignore_errors, fail_on_unknown, **conversion_set_kwargs) @RalphCLI.lazy_backends_options(get_cli_backends) @click.argument("archive", required=False) @click.option( - "-c", + "-s", "--chunk-size", type=int, default=None, @@ -678,18 +678,6 @@ def read( # noqa: PLR0913 click.echo(statement, nl=False) elif isinstance(backend, BaseStreamBackend): backend.stream(sys.stdout.buffer) - elif isinstance(backend, BaseHTTPBackend): - if query is not None: - query = backend.query(query=query) - for statement in backend.read( - target=target, query=query, chunk_size=chunk_size - ): - click.echo( - bytes( - json.dumps(statement) if isinstance(statement, dict) else statement, - encoding="utf-8", - ) - ) else: msg = "Cannot find an implemented backend type for backend %s" logger.error(msg, backend) @@ -698,19 +686,19 @@ def read( # noqa: PLR0913 @RalphCLI.lazy_backends_options(get_cli_write_backends) @click.option( - "-c", + "-t", + "--target", + type=str, + default=None, + help="The target container to write into", +) +@click.option( + "-s", "--chunk-size", type=int, default=None, help="Get events by chunks of size #", ) -@click.option( - "-f", - "--force", - default=False, - is_flag=True, - help="Overwrite existing archives or records", -) @click.option( "-I", "--ignore-errors", @@ -719,37 +707,26 @@ def read( # noqa: PLR0913 help="Continue writing regardless of raised errors", ) @click.option( - "-s", - "--simultaneous", - default=False, - is_flag=True, - help="With HTTP backend, POST all chunks simultaneously (instead of sequentially)", -) -@click.option( - "-m", - "--max-num-simultaneous", - type=int, - default=-1, - help=( - "The maximum number of chunks to send at once, when using `--simultaneous`. " - "Use `-1` to not set a limit." - ), + "-o", + "--operation-type", + type=click.Choice([op_type.value for op_type in BaseOperationType]), + metavar="OP_TYPE", + required=False, + help="Either index, create, delete, update or append", ) @click.option( - "-t", - "--target", - type=str, - default=None, - help="The target container to write into", + "-c", + "--concurrency", + default=1, + help="Number of chunks to write concurrently. (async backends only)", ) def write( # noqa: PLR0913 backend, + target, chunk_size, - force, ignore_errors, - simultaneous, - max_num_simultaneous, - target, + operation_type, + concurrency, **options, ): """Write an archive to a configured backend.""" @@ -757,40 +734,23 @@ def write( # noqa: PLR0913 logger.debug("Backend parameters: %s", options) - if max_num_simultaneous == 1: - max_num_simultaneous = None - backend_class = get_backend_class(get_cli_write_backends(), backend) backend = get_backend_instance(backend_class, options) - if isinstance(backend, (BaseDataBackend, BaseAsyncDataBackend)): - writer = ( - execute_async(backend.write) - if iscoroutinefunction(backend.write) - else backend.write - ) - writer( - data=sys.stdin.buffer, - target=target, - chunk_size=chunk_size, - ignore_errors=ignore_errors, - operation_type=BaseOperationType.UPDATE - if force - else BaseOperationType.INDEX, - ) - elif isinstance(backend, BaseHTTPBackend): - backend.write( - target=target, - data=sys.stdin.buffer, - chunk_size=chunk_size, - ignore_errors=ignore_errors, - simultaneous=simultaneous, - max_num_simultaneous=max_num_simultaneous, - ) - else: - msg = "Cannot find an implemented backend type for backend %s" - logger.error(msg, backend) - raise UnsupportedBackendException(msg, backend) + writer = backend.write + async_options = {} + if isinstance(backend, AsyncWritable): + writer = execute_async(backend.write) + async_options = {"concurrency": concurrency} + + writer( + data=sys.stdin.buffer, + target=target, + chunk_size=chunk_size, + ignore_errors=ignore_errors, + operation_type=BaseOperationType(operation_type) if operation_type else None, + **async_options, + ) @RalphCLI.lazy_backends_options(get_cli_list_backends, name="list") diff --git a/tests/backends/data/test_async_lrs.py b/tests/backends/data/test_async_lrs.py new file mode 100644 index 000000000..e504b287b --- /dev/null +++ b/tests/backends/data/test_async_lrs.py @@ -0,0 +1,766 @@ +"""Tests for Ralph LRS data backend.""" + +import asyncio +import json +import logging +import re +import time +from functools import partial + +import httpx +import pytest +from httpx import HTTPStatusError, RequestError +from pydantic import AnyHttpUrl, parse_obj_as +from pytest_httpx import HTTPXMock + +from ralph.backends.data.async_lrs import AsyncLRSDataBackend +from ralph.backends.data.base import BaseOperationType, DataBackendStatus +from ralph.backends.data.lrs import LRSDataBackend, LRSDataBackendSettings, LRSHeaders +from ralph.backends.lrs.base import LRSStatementsQuery +from ralph.exceptions import BackendException, BackendParameterException + +from ...helpers import mock_statement + + +def test_backends_data_async_lrs_default_instantiation(monkeypatch, fs, lrs_backend): + """Test the `LRSDataBackend` default instantiation.""" + fs.create_file(".env") + backend_settings_names = [ + "BASE_URL", + "USERNAME", + "PASSWORD", + "HEADERS", + "LOCALE_ENCODING", + "READ_CHUNK_SIZE", + "STATUS_ENDPOINT", + "STATEMENTS_ENDPOINT", + "WRITE_CHUNK_SIZE", + ] + for name in backend_settings_names: + monkeypatch.delenv(f"RALPH_BACKENDS__DATA__LRS__{name}", raising=False) + + assert AsyncLRSDataBackend.name == "async_lrs" + assert LRSDataBackend.name == "lrs" + backend_class = lrs_backend().__class__ + assert backend_class.settings_class == LRSDataBackendSettings + backend = backend_class() + assert backend.query_class == LRSStatementsQuery + assert backend.base_url == parse_obj_as(AnyHttpUrl, "http://0.0.0.0:8100") + assert backend.auth == ("ralph", "secret") + assert backend.settings.HEADERS == LRSHeaders() + assert backend.settings.LOCALE_ENCODING == "utf8" + assert backend.settings.READ_CHUNK_SIZE == 500 + assert backend.settings.STATUS_ENDPOINT == "/__heartbeat__" + assert backend.settings.STATEMENTS_ENDPOINT == "/xAPI/statements" + assert backend.settings.WRITE_CHUNK_SIZE == 500 + + # Test overriding default values with environment variables. + monkeypatch.setenv("RALPH_BACKENDS__DATA__LRS__USERNAME", "foo") + backend = backend_class() + assert backend.auth == ("foo", "secret") + + +def test_backends_data_async_lrs_instantiation_with_settings(lrs_backend): + """Test the LRS backend instantiation with settings.""" + + headers = LRSHeaders( + X_EXPERIENCE_API_VERSION="1.0.3", CONTENT_TYPE="application/json" + ) + settings = LRSDataBackendSettings( + BASE_URL="http://fake-lrs.com", + USERNAME="user", + PASSWORD="pass", + HEADERS=headers, + LOCALE_ENCODING="utf-16", + STATUS_ENDPOINT="/fake-status-endpoint", + STATEMENTS_ENDPOINT="/xAPI/statements", + READ_CHUNK_SIZE=5000, + WRITE_CHUNK_SIZE=5000, + ) + + assert AsyncLRSDataBackend.settings_class == LRSDataBackendSettings + backend = AsyncLRSDataBackend(settings) + assert backend.query_class == LRSStatementsQuery + assert isinstance(backend.base_url, AnyHttpUrl) + assert backend.auth == ("user", "pass") + assert backend.settings.HEADERS.CONTENT_TYPE == "application/json" + assert backend.settings.HEADERS.X_EXPERIENCE_API_VERSION == "1.0.3" + assert backend.settings.LOCALE_ENCODING == "utf-16" + assert backend.settings.STATUS_ENDPOINT == "/fake-status-endpoint" + assert backend.settings.STATEMENTS_ENDPOINT == "/xAPI/statements" + assert backend.settings.READ_CHUNK_SIZE == 5000 + assert backend.settings.WRITE_CHUNK_SIZE == 5000 + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_status_with_successful_request( + httpx_mock: HTTPXMock, lrs_backend +): + """Test the LRS backend status method returns `OK` when the request is + successful. + """ + backend: AsyncLRSDataBackend = lrs_backend() + # Mock GET response of HTTPX + url = "http://fake-lrs.com/__heartbeat__" + httpx_mock.add_response(url=url, method="GET", status_code=200) + status = await backend.status() + assert status == DataBackendStatus.OK + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_status_with_request_error( + httpx_mock: HTTPXMock, lrs_backend, caplog +): + """Test the LRS backend status method returns `AWAY` when a `RequestError` + exception is caught. + """ + backend: AsyncLRSDataBackend = lrs_backend() + httpx_mock.add_exception(RequestError("Test Request Error")) + with caplog.at_level(logging.ERROR): + status = await backend.status() + assert ( + f"ralph.backends.data.{backend.name}", + logging.ERROR, + "Unable to request the server", + ) in caplog.record_tuples + + assert status == DataBackendStatus.AWAY + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_status_with_http_status_error( + httpx_mock: HTTPXMock, lrs_backend, caplog +): + """Test the LRS backend status method returns `ERROR` when an `HTTPStatusError` + is caught. + """ + backend: AsyncLRSDataBackend = lrs_backend() + exception = HTTPStatusError("Test HTTP Status Error", request=None, response=None) + httpx_mock.add_exception(exception) + with caplog.at_level(logging.ERROR): + status = await backend.status() + assert ( + f"ralph.backends.data.{backend.name}", + logging.ERROR, + "Response raised an HTTP status of 4xx or 5xx", + ) in caplog.record_tuples + + assert status == DataBackendStatus.ERROR + await backend.close() + + +@pytest.mark.parametrize("max_statements", [None, 2, 4, 8]) +@pytest.mark.anyio +async def test_backends_data_async_lrs_read_max_statements( + httpx_mock: HTTPXMock, max_statements: int, lrs_backend +): + """Test the LRS backend `read` method `max_statements` argument.""" + statements = [mock_statement() for _ in range(3)] + response = {"statements": statements, "more": "/xAPI/statements/?pit_id=pit_id"} + more_response = {"statements": statements} + all_statements = statements * 2 + + # Mock GET response of HTTPX for target and "more" target without query parameter + url = "http://fake-lrs.com/xAPI/statements/?limit=500" + httpx_mock.add_response( + url=url, + method="GET", + json=response, + ) + # Given max_statements equal to two - already three statements were retrieved in the + # first request, thus no more requests are expected. + if not max_statements == 2: + url = "http://fake-lrs.com/xAPI/statements/?limit=500&pit_id=pit_id" + httpx_mock.add_response(url=url, method="GET", json=more_response) + + backend: AsyncLRSDataBackend = lrs_backend() + result = [x async for x in backend.read(max_statements=max_statements)] + # Assert that result is of the proper length + assert result == all_statements[:max_statements] + await backend.close() + + +@pytest.mark.anyio +@pytest.mark.parametrize("prefetch", [1, 10]) +async def test_backends_data_async_lrs_read_without_target( + prefetch: int, httpx_mock: HTTPXMock, lrs_backend +): + """Test that the LRS backend `read` method without target parameter value fetches + statements from '/xAPI/statements/' default endpoint. + """ + backend: AsyncLRSDataBackend = lrs_backend() + response = {"statements": mock_statement()} + url = "http://fake-lrs.com/xAPI/statements/?limit=500" + httpx_mock.add_response(url=url, method="GET", json=response) + result = [x async for x in backend.read(prefetch=prefetch)] + assert result == [response["statements"]] + await backend.close() + + +@pytest.mark.anyio +@pytest.mark.parametrize("prefetch", [1, 10]) +async def test_backends_data_async_lrs_read_backend_error( + httpx_mock: HTTPXMock, caplog, prefetch: int, lrs_backend +): + """Test the LRS backend `read` method raises a `BackendException` when the server + returns an error. + """ + backend: AsyncLRSDataBackend = lrs_backend() + url = "http://fake-lrs.com/xAPI/statements/?limit=500" + httpx_mock.add_response(url=url, method="GET", status_code=500) + error = ( + "Failed to fetch statements: Server error '500 Internal Server Error' for url " + "'http://fake-lrs.com/xAPI/statements/?limit=500'\nFor more information check: " + "https://httpstatuses.com/500" + ) + with pytest.raises(BackendException, match=re.escape(error)): + with caplog.at_level(logging.ERROR): + _ = [x async for x in backend.read(prefetch=prefetch)] + + assert ( + f"ralph.backends.data.{backend.name}", + logging.ERROR, + error, + ) in caplog.record_tuples + await backend.close() + + +@pytest.mark.anyio +@pytest.mark.parametrize("prefetch", [1, 10]) +async def test_backends_data_async_lrs_read_without_pagination( + httpx_mock: HTTPXMock, prefetch: int, lrs_backend +): + """Test the LRS backend `read` method when the request on the target endpoint + returns statements without pagination. + """ + backend: AsyncLRSDataBackend = lrs_backend() + statements = [ + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/paused"}), + ] + response = {"statements": statements} + + # Mock GET response of HTTPX without query parameter + url = "http://fake-lrs.com/xAPI/statements/?limit=500" + httpx_mock.add_response(url=url, method="GET", json=response) + + # Return an iterable of dict + result = [x async for x in backend.read(raw_output=False, prefetch=prefetch)] + assert result == statements + + # Return an iterable of bytes + result = [x async for x in backend.read(raw_output=True, prefetch=prefetch)] + assert result == [ + f"{json.dumps(statement)}\n".encode("utf-8") for statement in statements + ] + await backend.close() + + +@pytest.mark.anyio +@pytest.mark.parametrize("prefetch", [1, 10]) +async def test_backends_data_async_lrs_read_without_pagination_with_query( + httpx_mock: HTTPXMock, prefetch: int, lrs_backend +): + """Test the LRS backend `read` method with a query when the request on the target + endpoint returns statements without pagination. + """ + backend: AsyncLRSDataBackend = lrs_backend() + verb_id = "https://w3id.org/xapi/video/verbs/played" + query = LRSStatementsQuery(verb=verb_id) + statements = [ + mock_statement(verb={"id": verb_id}), + mock_statement(verb={"id": verb_id}), + ] + response = {"statements": statements} + # Mock GET response of HTTPX with query parameter + url = f"http://fake-lrs.com/xAPI/statements/?limit=500&verb={verb_id}" + httpx_mock.add_response(url=url, method="GET", json=response) + # Return an iterable of dict + reader = backend.read(query=query, raw_output=False, prefetch=prefetch) + result = [x async for x in reader] + assert result == statements + + # Return an iterable of bytes + reader = backend.read(query=query, raw_output=True, prefetch=prefetch) + result = [x async for x in reader] + assert result == [ + f"{json.dumps(statement)}\n".encode("utf-8") for statement in statements + ] + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_read_with_pagination( + httpx_mock: HTTPXMock, lrs_backend +): + """Test the LRS backend `read` method when the request on the target endpoint + returns statements with pagination. + """ + backend: AsyncLRSDataBackend = lrs_backend() + statements = [ + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/initialized"}), + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/paused"}), + ] + more_statements = [ + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/seeked"}), + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/paused"}), + ] + all_statements = statements + more_statements + response = {"statements": statements, "more": "/xAPI/statements/?pit_id=pit_id"} + more_response = {"statements": more_statements} + + url = "http://fake-lrs.com/xAPI/statements/?limit=500" + httpx_mock.add_response(url=url, method="GET", json=response) + url = "http://fake-lrs.com/xAPI/statements/?limit=500&pit_id=pit_id" + httpx_mock.add_response(url=url, method="GET", json=more_response) + + # Return an iterable of dict + result = [x async for x in backend.read(raw_output=False)] + assert result == all_statements + + # Return an iterable of bytes + result = [x async for x in backend.read(raw_output=True)] + assert result == [ + f"{json.dumps(statement)}\n".encode("utf-8") for statement in all_statements + ] + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_read_with_pagination_with_query( + httpx_mock: HTTPXMock, lrs_backend +): + """Test the LRS backend `read` method with a query when the request on the target + endpoint returns statements with pagination. + """ + backend: AsyncLRSDataBackend = lrs_backend() + verb_id = "https://w3id.org/xapi/video/verbs/played" + query = LRSStatementsQuery(verb=verb_id) + statements = [mock_statement(verb={"id": verb_id})] + more_statements = [ + mock_statement(verb={"id": verb_id}), + ] + all_statements = statements + more_statements + response = {"statements": statements, "more": "/xAPI/statements/?pit_id=pit_id"} + more_response = {"statements": more_statements} + + # Mock GET response of HTTPX with query parameter + url = f"http://fake-lrs.com/xAPI/statements/?limit=500&verb={verb_id}" + httpx_mock.add_response(url=url, method="GET", json=response) + url = f"http://fake-lrs.com/xAPI/statements/?limit=500&verb={verb_id}&pit_id=pit_id" + httpx_mock.add_response(url=url, method="GET", json=more_response) + + # Return an iterable of dict + reader = backend.read(query=query, raw_output=False) + result = [x async for x in reader] + assert result == all_statements + + # Return an iterable of bytes + reader = backend.read(query=query, raw_output=True) + result = [x async for x in reader] + assert result == [ + f"{json.dumps(statement)}\n".encode("utf-8") for statement in all_statements + ] + await backend.close() + + +@pytest.mark.anyio +@pytest.mark.parametrize( + "chunk_size,concurrency,statement_count_logs", + [ + (500, 1, [6]), + (2, 1, [6]), + (500, 2, [6]), + (2, 2, [2, 2, 2]), + (1, 2, [1, 1, 1, 1, 1, 1]), + ], +) +async def test_backends_data_async_lrs_write_without_operation( # noqa: PLR0913 + httpx_mock: HTTPXMock, + lrs_backend, + caplog, + chunk_size, + concurrency, + statement_count_logs, +): + """Test the LRS backend `write` method, given no operation_type should POST to + the LRS server. + """ + backend: AsyncLRSDataBackend = lrs_backend() + statements = [mock_statement() for _ in range(6)] + + # Mock HTTPX POST + url = "http://fake-lrs.com/xAPI/statements/" + httpx_mock.add_response(url=url, method="POST", json=statements) + + with caplog.at_level(logging.DEBUG): + assert await backend.write( + data=statements, + chunk_size=chunk_size, + concurrency=concurrency, + ) == len(statements) + + # If no chunk_size is provided, a default value (500) should be used. + if chunk_size is None: + chunk_size = 500 + + assert ( + f"ralph.backends.data.{backend.name}", + logging.DEBUG, + "Start writing to the http://fake-lrs.com/xAPI/statements/ endpoint " + f"(chunk size: {chunk_size})", + ) in caplog.record_tuples + + if isinstance(backend, AsyncLRSDataBackend): + # Only async backends support `concurrency`. + log_records = list(caplog.record_tuples) + for statement_count_log in statement_count_logs: + log_records.remove( + ( + f"ralph.backends.data.{backend.name}", + logging.DEBUG, + f"Posted {statement_count_log} statements", + ) + ) + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_write_without_data(caplog, lrs_backend): + """Test the LRS backend `write` method returns null when no data to write + in the target endpoint are given. + """ + backend: AsyncLRSDataBackend = lrs_backend() + with caplog.at_level(logging.INFO): + result = await backend.write([]) + + assert ( + f"ralph.backends.data.{backend.name}", + logging.INFO, + "Data Iterator is empty; skipping write to target", + ) in caplog.record_tuples + + assert result == 0 + await backend.close() + + +@pytest.mark.parametrize( + "operation_type,error_msg", + [ + (BaseOperationType.APPEND, "Append operation_type is not allowed"), + (BaseOperationType.UPDATE, "Update operation_type is not allowed"), + (BaseOperationType.DELETE, "Delete operation_type is not allowed"), + ], +) +@pytest.mark.anyio +async def test_backends_data_async_lrs_write_with_unsupported_operation( + lrs_backend, operation_type, caplog, error_msg +): + """Test the LRS backend `write` method, given an unsupported` `operation_type`, + should raise a `BackendParameterException`. + """ + backend: AsyncLRSDataBackend = lrs_backend() + with pytest.raises(BackendParameterException, match=error_msg): + with caplog.at_level(logging.ERROR): + await backend.write(data=[b"foo"], operation_type=operation_type) + + assert ( + f"ralph.backends.data.{backend.name}", + logging.ERROR, + error_msg, + ) in caplog.record_tuples + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_write_with_invalid_parameters( + httpx_mock: HTTPXMock, lrs_backend, caplog +): + """Test the LRS backend `write` method, given invalid_parameters + should raise a `BackendParameterException`. + """ + backend: AsyncLRSDataBackend = lrs_backend() + if not isinstance(backend, AsyncLRSDataBackend): + # Only async backends support `concurrency`. + await backend.close() + return + + error = "concurrency must be a strictly positive integer" + with pytest.raises(BackendParameterException, match=error): + with caplog.at_level(logging.ERROR): + await backend.write(data=[b"foo"], concurrency=-1) + + assert ( + f"ralph.backends.data.{backend.name}", + logging.ERROR, + error, + ) in caplog.record_tuples + + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_write_with_target( + httpx_mock: HTTPXMock, lrs_backend, caplog +): + """Test the LRS backend `write` method with a target parameter value writes + statements to the target endpoint. + """ + backend: AsyncLRSDataBackend = lrs_backend() + data = [mock_statement() for _ in range(3)] + + # Mock HTTPX POST + url = "http://fake-lrs.com/not-xAPI/not-statements/" + httpx_mock.add_response( + url=url, + method="POST", + json=data, + ) + + with caplog.at_level(logging.DEBUG): + assert await backend.write(data=data, target="/not-xAPI/not-statements/") == 3 + + assert ( + f"ralph.backends.data.{backend.name}", + logging.DEBUG, + f"Start writing to the {url} endpoint (chunk size: 500)", + ) in caplog.record_tuples + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_write_with_target_and_binary_data( + httpx_mock: HTTPXMock, lrs_backend, caplog +): + """Test the LRS backend `write` method with a target parameter value given + binary statements, writes them to the target endpoint. + """ + backend: AsyncLRSDataBackend = lrs_backend() + data = [mock_statement() for _ in range(3)] + bytes_data = [json.dumps(d).encode("utf-8") for d in data] + + # Mock HTTPX POST + url = "http://fake-lrs.com/not-xAPI/not-statements/" + httpx_mock.add_response( + url=url, + method="POST", + json=data, + ) + + with caplog.at_level(logging.DEBUG): + assert ( + await backend.write(data=bytes_data, target="/not-xAPI/not-statements/") + == 3 + ) + + assert ( + f"ralph.backends.data.{backend.name}", + logging.DEBUG, + f"Start writing to the {url} endpoint (chunk size: 500)", + ) in caplog.record_tuples + await backend.close() + + +@pytest.mark.anyio +@pytest.mark.parametrize( + "operation_type", + [BaseOperationType.CREATE, BaseOperationType.INDEX], +) +async def test_backends_data_async_lrs_write_with_create_or_index_operation( + operation_type, httpx_mock: HTTPXMock, lrs_backend, caplog +): + """Test the `LRSHTTP.write` method with `CREATE` or `INDEX` operation_type writes + statements to the given target endpoint. + """ + backend: AsyncLRSDataBackend = lrs_backend() + data = [mock_statement() for _ in range(3)] + + # Mock HTTPX POST + url = "http://fake-lrs.com/xAPI/statements/" + httpx_mock.add_response(url=url, method="POST", json=data) + + with caplog.at_level(logging.DEBUG): + assert await backend.write(data=data, operation_type=operation_type) == 3 + + assert ( + f"ralph.backends.data.{backend.name}", + logging.DEBUG, + "Posted 3 statements", + ) in caplog.record_tuples + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_write_with_post_exception( + httpx_mock: HTTPXMock, + lrs_backend, + caplog, +): + """Test the `LRSHTTP.write` method with HTTP error.""" + backend: AsyncLRSDataBackend = lrs_backend() + data = [mock_statement()] + + # Mock HTTPX POST + url = "http://fake-lrs.com/xAPI/statements/" + httpx_mock.add_response(url=url, method="POST", json=data, status_code=500) + with pytest.raises(BackendException): + with caplog.at_level(logging.ERROR): + await backend.write(data=data) + + msg = ( + "Failed to post statements: Server error '500 Internal Server Error' for url " + "'http://fake-lrs.com/xAPI/statements/'\nFor more information check: " + "https://httpstatuses.com/500" + ) + assert ( + f"ralph.backends.data.{backend.name}", + logging.ERROR, + msg, + ) in caplog.record_tuples + + # Given `ignore_errors=True` the `write` method should log a warning message. + with caplog.at_level(logging.WARNING): + assert not (await backend.write(data=data, ignore_errors=True)) + + assert ( + f"ralph.backends.data.{backend.name}", + logging.WARNING, + msg, + ) in caplog.record_tuples + await backend.close() + + +# Asynchronicity tests for dev purposes (skip in CI) + + +@pytest.mark.skip(reason="Timing based tests are too unstable to run in CI.") +@pytest.mark.anyio +@pytest.mark.parametrize( + "num_pages,chunk_size,network_latency_time", [(3, 3, 0.2), (10, 3, 0.2)] +) +async def test_backends_data_async_lrs_read_concurrency( + httpx_mock: HTTPXMock, num_pages, chunk_size, network_latency_time, lrs_backend +): + """Test concurrency performances in `read`, for development use. + + Args: + num_pages: the number of pages to generate + chunk_size: the number of results per page + network_latency_time: the wait duration before GET results + + NB: Maximal gains are (num_pages-1)*fetch_time; when batch_processing_time > + fetch_time + """ + backend: AsyncLRSDataBackend = lrs_backend() + if not isinstance(backend, AsyncLRSDataBackend): + # Only async backends support `concurrency`. + await backend.close() + return + + async def _simulate_network_latency(request: httpx.Request, response): + """Return requested statements after an async sleep time.""" + + await asyncio.sleep(network_latency_time) + return httpx.Response(status_code=200, json=response) + + async def _simulate_slow_processing(): + """Async sleep for a fixed amount of time.""" + # Time per chunk = API response time to maximize saved time + processing_time = network_latency_time / chunk_size + await asyncio.sleep(processing_time) + + # Generate fake targets + targets = {0: "/xAPI/statements/"} + for index in range(1, num_pages): + targets[index] = f"/xAPI/statements/?pit_id=fake-pit-{index}" + + # Generate fake statements + all_statements = {} + for index in range(num_pages): + all_statements[index] = { + "statements": [mock_statement() for _ in range(chunk_size)] + } + if index < num_pages - 1: + all_statements[index]["more"] = targets[index + 1] + + # Mock HTTPX GET + for index in range(num_pages): + pit_id = f"&pit_id=fake-pit-{index}" + if index == 0: + pit_id = "" + url = f"http://fake-lrs.com/xAPI/statements/?limit={chunk_size}{pit_id}" + statements = all_statements[index] + httpx_mock.add_callback( + partial(_simulate_network_latency, response=statements), + url=url, + method="GET", + ) + + # Check that read with `prefetch` is faster than without when processing is slow + time_1 = time.time() + async for _ in backend.read(target=targets[0], chunk_size=chunk_size): + await _simulate_slow_processing() + without_prefetch_duration = time.time() - time_1 + + time_2 = time.time() + async for _ in backend.read(target=targets[0], chunk_size=chunk_size, prefetch=100): + await _simulate_slow_processing() + prefetch_duration = time.time() - time_2 + + # Assert gains are close enough to theoretical gains + proximity_ratio = 0.9 + assert ( + without_prefetch_duration + > prefetch_duration + proximity_ratio * (num_pages - 1) * network_latency_time + ) + + +@pytest.mark.skip(reason="Timing based tests are too unstable to run in CI") +@pytest.mark.anyio +async def test_backends_data_async_lrs_write_concurrency( + httpx_mock: HTTPXMock, lrs_backend +): + """Test concurrency performances in `write`, for development use.""" + backend: AsyncLRSDataBackend = lrs_backend() + if not isinstance(backend, AsyncLRSDataBackend): + # Only async backends support `concurrency`. + await backend.close() + return + + data = [mock_statement() for _ in range(6)] + + # Changing data length might break tests + assert len(data) == 6 + + # Mock HTTPX POST + async def simulate_network_latency(_): + await asyncio.sleep(0.5) + return httpx.Response(status_code=200) + + httpx_mock.add_callback(simulate_network_latency) + + # Check that concurrent write is faster than non-concurrent + time_1 = time.time() + await backend.write(data, chunk_size=2, concurrency=1) + non_concurrent_duration = time.time() - time_1 + + time_2 = time.time() + await backend.write(data=data, chunk_size=2, concurrency=3) + concurrent_duration = time.time() - time_2 + + # Server side processing time should be 3 times faster + assert non_concurrent_duration > 2.1 * concurrent_duration + + # Check that write with `concurrency`=2 functions as expected + time_1 = time.time() + await backend.write(data=data, chunk_size=1, concurrency=2) + limited_concurrency_duration = time.time() - time_1 + + # Server side processing time should be 3 times faster with unlimited + assert limited_concurrency_duration > 2.1 * concurrent_duration + assert limited_concurrency_duration <= 3.1 * concurrent_duration diff --git a/tests/backends/data/test_base.py b/tests/backends/data/test_base.py index e7f8f355a..0b5c76a55 100644 --- a/tests/backends/data/test_base.py +++ b/tests/backends/data/test_base.py @@ -29,7 +29,7 @@ ], ) def test_backends_data_base_validate_backend_query_with_valid_input(value, expected): - """Test the enforce_query_checks function given valid input.""" + """Test the validate_backend_query function given valid input.""" class MockBaseDataBackend(BaseDataBackend[BaseDataBackendSettings, BaseQuery]): """A class mocking the base data backend class.""" @@ -62,10 +62,10 @@ def close(self): ), ], ) -def test_backends_data_base_enforce_query_checks_with_invalid_input( +def test_backends_data_base_validate_backend_query_with_invalid_input( value, error, caplog ): - """Test the enforce_query_checks function given invalid input.""" + """Test the validate_backend_query function given invalid input.""" class MockBaseDataBackend(BaseDataBackend[BaseDataBackendSettings, BaseQuery]): """A class mocking the base database class.""" diff --git a/tests/backends/http/__init__.py b/tests/backends/http/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/backends/http/test_async_lrs.py b/tests/backends/http/test_async_lrs.py deleted file mode 100644 index 9c6b6df2f..000000000 --- a/tests/backends/http/test_async_lrs.py +++ /dev/null @@ -1,1083 +0,0 @@ -"""Tests for Ralph LRS HTTP backend.""" - -import asyncio -import json -import logging -import time -from functools import partial -from urllib.parse import ParseResult, parse_qsl, urlencode, urljoin, urlparse - -import httpx -import pytest -from httpx import HTTPStatusError, RequestError -from pydantic import AnyHttpUrl, parse_obj_as -from pytest_httpx import HTTPXMock - -from ralph.backends.http.async_lrs import ( - AsyncLRSHTTPBackend, - LRSHeaders, - LRSHTTPBackendSettings, - OperationType, -) -from ralph.backends.http.base import HTTPBackendStatus -from ralph.backends.lrs.base import LRSStatementsQuery -from ralph.exceptions import BackendException, BackendParameterException - -from ...helpers import mock_statement - - -async def _unpack_async_generator(async_gen): - """Unpack content of async generator into list.""" - result = [] - async for value in async_gen: - result.append(value) - return result - - -def test_backends_http_async_lrs_default_instantiation(monkeypatch, fs): - """Test the `LRSHTTPBackend` default instantiation.""" - fs.create_file(".env") - backend_settings_names = [ - "BASE_URL", - "USERNAME", - "PASSWORD", - "HEADERS", - "STATUS_ENDPOINT", - "STATEMENTS_ENDPOINT", - ] - for name in backend_settings_names: - monkeypatch.delenv(f"RALPH_BACKENDS__HTTP__LRS__{name}", raising=False) - - assert AsyncLRSHTTPBackend.name == "async_lrs" - assert AsyncLRSHTTPBackend.settings_class == LRSHTTPBackendSettings - backend = AsyncLRSHTTPBackend() - assert backend.query == LRSStatementsQuery - assert backend.base_url == parse_obj_as(AnyHttpUrl, "http://0.0.0.0:8100") - assert backend.auth == ("ralph", "secret") - assert backend.settings.HEADERS == LRSHeaders() - assert backend.settings.STATUS_ENDPOINT == "/__heartbeat__" - assert backend.settings.STATEMENTS_ENDPOINT == "/xAPI/statements" - - # Test overriding default values with environment variables. - monkeypatch.setenv("RALPH_BACKENDS__HTTP__LRS__USERNAME", "foo") - backend = AsyncLRSHTTPBackend() - assert backend.auth == ("foo", "secret") - - -def test_backends_http_async_lrs_instantiation(): - """Test the LRS backend default instantiation.""" - - headers = LRSHeaders( - X_EXPERIENCE_API_VERSION="1.0.3", CONTENT_TYPE="application/json" - ) - settings = LRSHTTPBackendSettings( - BASE_URL="http://fake-lrs.com", - USERNAME="user", - PASSWORD="pass", - HEADERS=headers, - STATUS_ENDPOINT="/fake-status-endpoint", - STATEMENTS_ENDPOINT="/xAPI/statements", - ) - - assert AsyncLRSHTTPBackend.name == "async_lrs" - assert AsyncLRSHTTPBackend.settings_class == LRSHTTPBackendSettings - backend = AsyncLRSHTTPBackend(settings) - assert backend.query == LRSStatementsQuery - assert isinstance(backend.base_url, AnyHttpUrl) - assert backend.auth == ("user", "pass") - assert backend.settings.HEADERS.CONTENT_TYPE == "application/json" - assert backend.settings.HEADERS.X_EXPERIENCE_API_VERSION == "1.0.3" - assert backend.settings.STATUS_ENDPOINT == "/fake-status-endpoint" - assert backend.settings.STATEMENTS_ENDPOINT == "/xAPI/statements" - - -@pytest.mark.anyio -async def test_backends_http_async_lrs_status_with_successful_request( - httpx_mock: HTTPXMock, -): - """Test the LRS backend status method returns `OK` when the request is - successful. - """ - base_url = "http://fake-lrs.com" - status_endpoint = "/__heartbeat__" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - STATUS_ENDPOINT=status_endpoint, - ) - backend = AsyncLRSHTTPBackend(settings) - - # Mock GET response of HTTPX - httpx_mock.add_response( - url=urljoin(base_url, status_endpoint), method="GET", status_code=200 - ) - - status = await backend.status() - assert status == HTTPBackendStatus.OK - - -@pytest.mark.anyio -async def test_backends_http_async_lrs_status_with_request_error( - httpx_mock: HTTPXMock, caplog -): - """Test the LRS backend status method returns `AWAY` when a `RequestError` - exception is caught. - """ - - base_url = "http://fake-lrs.com" - status_endpoint = "/__heartbeat__" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - STATUS_ENDPOINT=status_endpoint, - ) - backend = AsyncLRSHTTPBackend(settings) - - httpx_mock.add_exception(RequestError("Test Request Error")) - - with caplog.at_level(logging.ERROR): - status = await backend.status() - assert ( - "ralph.backends.http.async_lrs", - logging.ERROR, - "Unable to request the server", - ) in caplog.record_tuples - - assert status == HTTPBackendStatus.AWAY - - -@pytest.mark.anyio -async def test_backends_http_async_lrs_status_with_http_status_error( - httpx_mock: HTTPXMock, caplog -): - """Test the LRS backend status method returns `ERROR` when an `HTTPStatusError` - is caught. - """ - - base_url = "http://fake-lrs.com" - status_endpoint = "/__heartbeat__" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - STATUS_ENDPOINT=status_endpoint, - ) - backend = AsyncLRSHTTPBackend(settings) - - httpx_mock.add_exception( - HTTPStatusError("Test HTTP Status Error", request=None, response=None) - ) - - with caplog.at_level(logging.ERROR): - status = await backend.status() - - assert ( - "ralph.backends.http.async_lrs", - logging.ERROR, - "Response raised an HTTP status of 4xx or 5xx", - ) in caplog.record_tuples - assert status == HTTPBackendStatus.ERROR - - -@pytest.mark.anyio -async def test_backends_http_async_lrs_list(caplog): - """Test the LRS backend `list` method raises a `NotImplementedError`.""" - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - msg = ( - "LRS HTTP backend does not support `list` method, " - "cannot list from /xAPI/statements/" - ) - with pytest.raises(NotImplementedError, match=msg): - with caplog.at_level(logging.ERROR): - await backend.list(target=target) - - assert ( - "ralph.backends.http.async_lrs", - logging.ERROR, - ( - "LRS HTTP backend does not support `list` method, cannot list" - f" from {target}" - ), - ) in caplog.record_tuples - - -@pytest.mark.parametrize("max_statements", [None, 2, 4, 8]) -@pytest.mark.anyio -async def test_backends_http_async_lrs_read_max_statements( - httpx_mock: HTTPXMock, max_statements: int -): - """Test the LRS backend `read` method `max_statements` property.""" - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - more_target = "/xAPI/statements/?pit_id=fake-pit-id" - - chunk_size = 3 - - statements = { - "statements": [mock_statement() for _ in range(chunk_size)], - "more": more_target, - } - more_statements = { - "statements": [mock_statement() for _ in range(chunk_size)], - } - - # Mock GET response of HTTPX for target and "more" target without query parameter - default_params = LRSStatementsQuery(limit=chunk_size).dict( - exclude_none=True, exclude_unset=True - ) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=target, - query=urlencode(default_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=statements, - ) - - default_params.update(dict(parse_qsl(urlparse(more_target).query))) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=urlparse(more_target).path, - query=urlencode(default_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=more_statements, - ) - - settings = AsyncLRSHTTPBackend.settings_class( - BASE_URL=base_url, USERNAME="user", PASSWORD="pass" - ) - backend = AsyncLRSHTTPBackend(settings) - - # Return an iterable of dict - result = await _unpack_async_generator( - backend.read( - target=target, max_statements=max_statements, chunk_size=chunk_size - ) - ) - all_statements = statements["statements"] + more_statements["statements"] - assert len(all_statements) == 6 - - # Assert that result is of the proper length - if max_statements is None: - assert result == all_statements - else: - assert result == all_statements[:max_statements] - - -@pytest.mark.parametrize("greedy", [False, True]) -@pytest.mark.anyio -async def test_backends_http_async_lrs_read_without_target( - httpx_mock: HTTPXMock, greedy: bool -): - """Test that the LRS backend `read` method without target parameter value fetches - statements from '/xAPI/statements' default endpoint. - """ - - base_url = "http://fake-lrs.com" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - statements = {"statements": [mock_statement() for _ in range(3)]} - - # Mock HTTPX GET - default_params = LRSStatementsQuery(limit=500).dict( - exclude_none=True, exclude_unset=True - ) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=backend.settings.STATEMENTS_ENDPOINT, - query=urlencode(default_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=statements, - ) - - result = await _unpack_async_generator(backend.read(greedy=greedy)) - assert result == statements["statements"] - assert len(result) == 3 - - -@pytest.mark.anyio -@pytest.mark.parametrize("greedy", [False, True]) -async def test_backends_http_async_lrs_read_backend_error( - httpx_mock: HTTPXMock, caplog, greedy: bool -): - """Test the LRS backend `read` method raises a `BackendException` when the server - returns an error. - """ - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - # Mock GET response of HTTPX - default_params = LRSStatementsQuery(limit=500).dict( - exclude_none=True, exclude_unset=True - ) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=target, - query=urlencode(default_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - status_code=500, - ) - - with pytest.raises(BackendException, match="Failed to fetch statements."): - with caplog.at_level(logging.ERROR): - await _unpack_async_generator(backend.read(target=target, greedy=greedy)) - - assert ( - "ralph.backends.http.async_lrs", - logging.ERROR, - "Failed to fetch statements.", - ) in caplog.record_tuples - - -@pytest.mark.anyio -@pytest.mark.parametrize("greedy", [False, True]) -async def test_backends_http_async_lrs_read_without_pagination( - httpx_mock: HTTPXMock, greedy: bool -): - """Test the LRS backend `read` method when the request on the target endpoint - returns statements without pagination.""" - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - statements = { - "statements": [ - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/paused"}), - ] - } - - # Mock GET response of HTTPX without query parameter - default_params = LRSStatementsQuery(limit=500).dict( - exclude_none=True, exclude_unset=True - ) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=target, - query=urlencode(default_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=statements, - ) - - # Return an iterable of dict - result = await _unpack_async_generator( - backend.read(target=target, raw_output=False, greedy=greedy) - ) - assert result == statements["statements"] - assert len(result) == 3 - - # Return an iterable of bytes - result = await _unpack_async_generator( - backend.read(target=target, raw_output=True, greedy=greedy) - ) - assert result == [ - bytes(json.dumps(statement), encoding="utf-8") - for statement in statements["statements"] - ] - assert len(result) == 3 - - # Mock GET response of HTTPX with query parameter - query = LRSStatementsQuery(verb="https://w3id.org/xapi/video/verbs/played") - - statements_with_query_played_verb = { - "statements": [ - raw for raw in statements["statements"] if "played" in raw["verb"]["id"] - ] - } - query_params = query.dict(exclude_none=True, exclude_unset=True) - query_params.update({"limit": 500}) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=target, - query=urlencode(query_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=statements_with_query_played_verb, - ) - # Return an iterable of dict - result = await _unpack_async_generator( - backend.read(query=query, target=target, raw_output=False, greedy=greedy) - ) - assert result == statements_with_query_played_verb["statements"] - assert len(result) == 2 - - # Return an iterable of bytes - result = await _unpack_async_generator( - backend.read(query=query, target=target, raw_output=True, greedy=greedy) - ) - assert result == [ - bytes(json.dumps(statement), encoding="utf-8") - for statement in statements_with_query_played_verb["statements"] - ] - assert len(result) == 2 - - -@pytest.mark.anyio -async def test_backends_http_async_lrs_read_with_pagination(httpx_mock: HTTPXMock): - """Test the LRS backend `read` method when the request on the target endpoint - returns statements with pagination.""" - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - more_target = "/xAPI/statements/?pit_id=fake-pit-id" - statements = { - "statements": [ - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), - mock_statement( - verb={"id": "https://w3id.org/xapi/video/verbs/initialized"} - ), - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/paused"}), - ], - "more": more_target, - } - more_statements = { - "statements": [ - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/seeked"}), - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/paused"}), - ] - } - - # Mock GET response of HTTPX for target and "more" target without query parameter - default_params = LRSStatementsQuery(limit=500).dict( - exclude_none=True, exclude_unset=True - ) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=target, - query=urlencode(default_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=statements, - ) - default_params.update(dict(parse_qsl(urlparse(more_target).query))) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=urlparse(more_target).path, - query=urlencode(default_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=more_statements, - ) - - # Return an iterable of dict - result = await _unpack_async_generator( - backend.read(target=target, raw_output=False) - ) - assert result == statements["statements"] + more_statements["statements"] - assert len(result) == 6 - - # Return an iterable of bytes - result = await _unpack_async_generator(backend.read(target=target, raw_output=True)) - assert result == [ - bytes(json.dumps(statement), encoding="utf-8") - for statement in statements["statements"] + more_statements["statements"] - ] - assert len(result) == 6 - - # Mock GET response of HTTPX with query parameter - query = LRSStatementsQuery(verb="https://w3id.org/xapi/video/verbs/played") - - statements_with_query_played_verb = { - "statements": [ - raw for raw in statements["statements"] if "played" in raw["verb"]["id"] - ], - "more": more_target, - } - more_statements_with_query_played_verb = { - "statements": [ - raw - for raw in more_statements["statements"] - if "played" in raw["verb"]["id"] - ] - } - query_params = query.dict(exclude_none=True, exclude_unset=True) - query_params.update({"limit": 500}) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=target, - query=urlencode(query_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=statements_with_query_played_verb, - ) - query_params.update(dict(parse_qsl(urlparse(more_target).query))) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=urlparse(more_target).path, - query=urlencode(query_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=more_statements_with_query_played_verb, - ) - - # Return an iterable of dict - - result = await _unpack_async_generator( - backend.read(query=query, target=target, raw_output=False) - ) - assert ( - result - == statements_with_query_played_verb["statements"] - + more_statements_with_query_played_verb["statements"] - ) - assert len(result) == 2 - - # Return an iterable of bytes - result = await _unpack_async_generator( - backend.read(query=query, target=target, raw_output=True) - ) - - assert result == [ - bytes(json.dumps(statement), encoding="utf-8") - for statement in statements_with_query_played_verb["statements"] - + more_statements_with_query_played_verb["statements"] - ] - assert len(result) == 2 - - -@pytest.mark.anyio -@pytest.mark.parametrize( - "chunk_size,simultaneous,max_num_simultaneous", - [ - (None, False, None), - (500, False, None), - (2, False, None), - (500, True, 1), - (2, True, 1), - (500, True, 2), - (2, True, 2), - (500, True, None), - (2, True, None), - ], -) -async def test_backends_http_async_lrs_write_without_operation( - httpx_mock: HTTPXMock, caplog, chunk_size, simultaneous, max_num_simultaneous -): - """Test the LRS backend `write` method, given no operation_type should POST to - the LRS server. - """ - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - data = [mock_statement() for _ in range(6)] - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - # Mock HTTPX POST - httpx_mock.add_response(url=urljoin(base_url, target), method="POST", json=data) - - with caplog.at_level(logging.DEBUG): - result = await backend.write( - target=target, - data=data, - chunk_size=chunk_size, - simultaneous=simultaneous, - max_num_simultaneous=max_num_simultaneous, - ) - - # If no chunk_size is provided, a default value (500) should be used. - if chunk_size is None: - chunk_size = 500 - - assert ( - "ralph.backends.http.async_lrs", - logging.DEBUG, - f"Start writing to the {base_url}{target} endpoint (chunk size: {chunk_size})", - ) in caplog.record_tuples - - assert ( - "ralph.backends.http.async_lrs", - logging.DEBUG, - f"Posted {len(data)} statements", - ) in caplog.record_tuples - - assert result == len(data) - - -@pytest.mark.anyio -async def test_backends_http_async_lrs_write_without_data(caplog): - """Test the LRS backend `write` method returns null when no data to write - in the target endpoint are given. - """ - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - with caplog.at_level(logging.INFO): - result = await backend.write(target=target, data=[]) - - assert ( - "ralph.backends.http.async_lrs", - logging.INFO, - "Data Iterator is empty; skipping write to target", - ) in caplog.record_tuples - - assert result == 0 - - -@pytest.mark.parametrize( - "operation_type,error_msg", - [ - (OperationType.APPEND, "append operation type is not supported."), - (OperationType.UPDATE, "update operation type is not supported."), - (OperationType.DELETE, "delete operation type is not supported."), - ], -) -@pytest.mark.anyio -async def test_backends_http_async_lrs_write_with_unsupported_operation( - operation_type, caplog, error_msg -): - """Test the LRS backend `write` method, given an unsupported` `operation_type`, - should raise a `BackendParameterException`. - """ - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - with pytest.raises(BackendParameterException, match=error_msg): - with caplog.at_level(logging.ERROR): - await backend.write( - target=target, data=[b"foo"], operation_type=operation_type - ) - - assert ( - "ralph.backends.http.async_lrs", - logging.DEBUG, - f"{operation_type.value} operation type is not supported.", - ) in caplog.record_tuples - - -@pytest.mark.parametrize( - "simultaneous,max_num_simultaneous,error_msg", - [ - (True, 0, "max_num_simultaneous must be a strictly positive integer"), - (False, 2, "max_num_simultaneous can only be used with `simultaneous=True`"), - ], -) -@pytest.mark.anyio -async def test_backends_https_async_lrs_write_with_invalid_parameters( - caplog, simultaneous, max_num_simultaneous, error_msg -): - """Test the LRS backend `write` method, given invalid_parameters - should raise a `BackendParameterException`. - """ - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - with pytest.raises(BackendParameterException, match=error_msg): - with caplog.at_level(logging.ERROR): - await backend.write( - target=target, - data=[b"foo"], - simultaneous=simultaneous, - max_num_simultaneous=max_num_simultaneous, - ) - - assert ( - "ralph.backends.http.async_lrs", - logging.ERROR, - error_msg, - ) in caplog.record_tuples - - -@pytest.mark.anyio -async def test_backends_https_async_lrs_write_without_target( - httpx_mock: HTTPXMock, caplog -): - """Test the LRS backend `write` method without target parameter value writes - statements to '/xAPI/statements' default endpoint. - """ - - base_url = "http://fake-lrs.com" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - data = [mock_statement() for _ in range(3)] - - # Mock HTTPX POST - httpx_mock.add_response( - url=urljoin(base_url, backend.settings.STATEMENTS_ENDPOINT), - method="POST", - json=data, - ) - - with caplog.at_level(logging.DEBUG): - result = await backend.write(data=data, operation_type=OperationType.CREATE) - assert ( - "ralph.backends.http.async_lrs", - logging.DEBUG, - "Start writing to the " - f"{base_url}{LRSHTTPBackendSettings().STATEMENTS_ENDPOINT} " - "endpoint (chunk size: 500)", - ) in caplog.record_tuples - - assert result == len(data) - - -@pytest.mark.anyio -async def test_backends_https_async_lrs_write_with_create_or_index_operation( - httpx_mock: HTTPXMock, caplog -): - """Test the `LRSHTTP.write` method with `CREATE` or `INDEX` operation_type writes - statements to the given target endpoint. - """ - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - data = [mock_statement() for _ in range(3)] - - # Mock HTTPX POST - httpx_mock.add_response(url=urljoin(base_url, target), method="POST", json=data) - - with caplog.at_level(logging.INFO): - result = await backend.write( - target=target, data=data, operation_type=OperationType.CREATE - ) - assert result == len(data) - - with caplog.at_level(logging.INFO): - result = await backend.write( - target=target, data=data, operation_type=OperationType.INDEX - ) - assert result == len(data) - - -@pytest.mark.anyio -async def test_backends_https_async_lrs_write_backend_exception( - httpx_mock: HTTPXMock, - caplog, -): - """Test the `LRSHTTP.write` method with HTTP error.""" - base_url = "http://fake-lrs.com" - target = "/xAPI/statements" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - data = [mock_statement()] - - # Mock HTTPX POST - httpx_mock.add_response( - url=urljoin(base_url, target), method="POST", json=data, status_code=500 - ) - with pytest.raises(BackendException): - with caplog.at_level(logging.ERROR): - await backend.write(target=target, data=data) - assert ( - "ralph.backends.http.async_lrs", - logging.ERROR, - "Failed to post statements", - ) in caplog.record_tuples - - -# Asynchronicity tests for dev purposes (skip in CI) - - -@pytest.mark.skip(reason="Timing based tests are too unstable to run in CI.") -@pytest.mark.anyio -@pytest.mark.parametrize( - "num_pages,chunk_size,network_latency_time", [(3, 3, 0.2), (10, 3, 0.2)] -) -async def test_backends_https_async_lrs_read_concurrency( - httpx_mock: HTTPXMock, num_pages, chunk_size, network_latency_time -): - """Test concurrency performances in `read`, for development use. - - Args: - num_pages: the number of pages to generate - chunk_size: the number of results per page - network_latency_time: the wait duration before GET results - - NB: Maximal gains are (num_pages-1)*fetch_time; when batch_processing_time > - fetch_time - """ - - async def _simulate_network_latency(request: httpx.Request, response): - """Return requested statements after an async sleep time.""" - - await asyncio.sleep(network_latency_time) - return httpx.Response(status_code=200, json=response) - - async def _simulate_slow_processing(): - """Async sleep for a fixed amount of time.""" - # Time per chunk = API response time to maximize saved time - processing_time = network_latency_time / chunk_size - await asyncio.sleep(processing_time) - - base_url = "http://fake-lrs.com" - - # Generate fake targets - target_template = "/xAPI/statements/?pit_id=fake-pit-{}" - targets = {0: "/xAPI/statements/"} - for index in range(1, num_pages): - targets[index] = target_template.format(index) - - # Generate fake statements - all_statements = {} - for index in range(num_pages): - all_statements[index] = { - "statements": [mock_statement() for _ in range(chunk_size)] - } - if index < num_pages - 1: - all_statements[index]["more"] = targets[index + 1] - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - # Mock HTTPX GET - params = {"limit": chunk_size} - for index in range(num_pages): - url = ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=urlparse(targets[index]).path, - query=urlencode(params), - params="", - fragment="", - ).geturl() - - statements = all_statements[index] - httpx_mock.add_callback( - partial(_simulate_network_latency, response=statements), - url=url, - method="GET", - ) - - if index < num_pages - 1: - params.update(dict(parse_qsl(urlparse(targets[index + 1]).query))) - - # Check that greedy read is faster than non-greedy when processing is slow - time_1 = time.time() - counter = 0 - async for _ in backend.read(target=targets[0], chunk_size=chunk_size, greedy=False): - await _simulate_slow_processing() - counter += 1 - duration_non_greedy = time.time() - time_1 - - time_2 = time.time() - async for _ in backend.read(target=targets[0], chunk_size=chunk_size, greedy=True): - await _simulate_slow_processing() - duration_greedy = time.time() - time_2 - - # Assert gains are close enough to theoretical gains - proximity_ratio = 0.9 - assert ( - duration_non_greedy - > duration_greedy + proximity_ratio * (num_pages - 1) * network_latency_time - ) - - -@pytest.mark.skip(reason="Timing based tests are too unstable to run in CI") -@pytest.mark.anyio -async def test_backends_https_async_lrs_write_concurrency( - httpx_mock: HTTPXMock, -): - """Test concurrency performances in `write`, for development use.""" - - base_url = "http://fake-lrs.com" - - data = [mock_statement() for _ in range(6)] - - # Changing data length might break tests - assert len(data) == 6 - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - # Mock HTTPX POST - async def simulate_network_latency(request: httpx.Request): - await asyncio.sleep(0.5) - return httpx.Response( - status_code=200, - ) - - httpx_mock.add_callback(simulate_network_latency) - - # Check that simultaneous write is faster than non-simultaneous - time_1 = time.time() - await backend.write( - data=data, - chunk_size=2, - simultaneous=False, - max_num_simultaneous=None, - ) - duration_non_simultaneous = time.time() - time_1 - - time_2 = time.time() - await backend.write( - data=data, - chunk_size=2, - simultaneous=True, - max_num_simultaneous=None, - ) - duration_simultaneous = time.time() - time_2 - - # Server side processing time should be 3 times faster - assert duration_non_simultaneous > 2.1 * duration_simultaneous - - # Check that write with max_num_simultaneous functions as expected - time_1 = time.time() - await backend.write( - data=data, - chunk_size=1, - simultaneous=True, - max_num_simultaneous=2, - ) - duration_limited_simultaneous = time.time() - time_1 - - time_2 = time.time() - await backend.write( - data=data, - chunk_size=1, - simultaneous=True, - max_num_simultaneous=None, - ) - duration_unlimited_simultaneous = time.time() - time_2 - - # Server side processing time should be 3 times faster with unlimited - assert duration_limited_simultaneous > 2.1 * duration_unlimited_simultaneous - assert duration_limited_simultaneous <= 3.1 * duration_unlimited_simultaneous diff --git a/tests/backends/http/test_base.py b/tests/backends/http/test_base.py deleted file mode 100644 index d5bbf9333..000000000 --- a/tests/backends/http/test_base.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Tests for Ralph base HTTP backend.""" - -from typing import Iterator, Union - -from ralph.backends.http.base import BaseHTTPBackend, BaseQuery - - -def test_backends_http_base_abstract_interface_with_implemented_abstract_method(): - """Test the interface mechanism with properly implemented abstract methods.""" - - class GoodStorage(BaseHTTPBackend): - """Correct implementation with required abstract methods.""" - - name = "good" - - async def status(self): - """Fake the status method.""" - - async def list( - self, target: str = None, details: bool = False, new: bool = False - ) -> Iterator[Union[str, dict]]: - """Fake the list method.""" - - async def read(self): - """Fake the read method.""" - - async def write(self): - """Fake the write method.""" - - GoodStorage() - - assert GoodStorage.name == "good" - assert GoodStorage.query == BaseQuery diff --git a/tests/backends/http/test_lrs.py b/tests/backends/http/test_lrs.py deleted file mode 100644 index 130fb0df8..000000000 --- a/tests/backends/http/test_lrs.py +++ /dev/null @@ -1,167 +0,0 @@ -"""Tests for Ralph Async LRS HTTP backend.""" - -import asyncio -import re -from unittest.mock import AsyncMock - -import pytest -from pydantic import AnyHttpUrl, parse_obj_as - -from ralph.backends.http.async_lrs import ( - AsyncLRSHTTPBackend, - HTTPBackendStatus, - LRSHeaders, - LRSHTTPBackendSettings, -) -from ralph.backends.http.lrs import LRSHTTPBackend -from ralph.backends.lrs.base import LRSStatementsQuery - - -@pytest.mark.anyio -@pytest.mark.parametrize("method", ["status", "list", "write", "read"]) -async def test_backends_http_lrs_in_async_setting(monkeypatch, method): - """Test that backend returns the proper error when run in async function.""" - - # Define mock responses - if method == "read": - read_mock_response = [{"hello": "world"}, {"save": "pandas"}] - - async def response_mock(*args, **kwargs): - """Mock a read function.""" - - for statement in read_mock_response: - yield statement - - else: - response_mock = AsyncMock(return_value=HTTPBackendStatus.OK) - monkeypatch.setattr(AsyncLRSHTTPBackend, method, response_mock) - - async def async_function(): - """Encapsulate the synchronous method in an asynchronous function.""" - lrs = LRSHTTPBackend() - if method == "read": - list(getattr(lrs, method)()) - else: - getattr(lrs, method)() - - # Check that the proper error is raised - with pytest.raises( - RuntimeError, - match=re.escape( - ( - f"This event loop is already running. You must use " - f"`AsyncLRSHTTPBackend.{method}` (instead of `LRSHTTPBackend.{method}`)" - ", or run this code outside the current event loop." - ) - ), - ): - await async_function() - - -@pytest.mark.anyio -def test_backends_http_lrs_default_instantiation(monkeypatch, fs): - """Test the `LRSHTTPBackend` default instantiation.""" - fs.create_file(".env") - backend_settings_names = [ - "BASE_URL", - "USERNAME", - "PASSWORD", - "HEADERS", - "STATUS_ENDPOINT", - "STATEMENTS_ENDPOINT", - ] - for name in backend_settings_names: - monkeypatch.delenv(f"RALPH_BACKENDS__HTTP__LRS__{name}", raising=False) - - assert LRSHTTPBackend.name == "lrs" - assert LRSHTTPBackend.settings_class == LRSHTTPBackendSettings - backend = LRSHTTPBackend() - assert backend.query == LRSStatementsQuery - assert backend.base_url == parse_obj_as(AnyHttpUrl, "http://0.0.0.0:8100") - assert backend.auth == ("ralph", "secret") - assert backend.settings.HEADERS == LRSHeaders() - assert backend.settings.STATUS_ENDPOINT == "/__heartbeat__" - assert backend.settings.STATEMENTS_ENDPOINT == "/xAPI/statements" - - # Test overriding default values with environment variables. - monkeypatch.setenv("RALPH_BACKENDS__HTTP__LRS__USERNAME", "foo") - backend = LRSHTTPBackend() - assert backend.auth == ("foo", "secret") - - -def test_backends_http_lrs_instantiation_with_settings(): - """Test the LRS backend default instantiation.""" - - headers = LRSHeaders( - X_EXPERIENCE_API_VERSION="1.0.3", CONTENT_TYPE="application/json" - ) - settings = LRSHTTPBackendSettings( - BASE_URL="http://fake-lrs.com", - USERNAME="user", - PASSWORD="pass", - HEADERS=headers, - STATUS_ENDPOINT="/fake-status-endpoint", - STATEMENTS_ENDPOINT="/xAPI/statements", - ) - - assert LRSHTTPBackend.name == "lrs" - assert LRSHTTPBackend.settings_class == LRSHTTPBackendSettings - backend = LRSHTTPBackend(settings) - assert backend.query == LRSStatementsQuery - assert isinstance(backend.base_url, AnyHttpUrl) - assert backend.auth == ("user", "pass") - assert backend.settings.HEADERS.CONTENT_TYPE == "application/json" - assert backend.settings.HEADERS.X_EXPERIENCE_API_VERSION == "1.0.3" - assert backend.settings.STATUS_ENDPOINT == "/fake-status-endpoint" - assert backend.settings.STATEMENTS_ENDPOINT == "/xAPI/statements" - - -def test_backends_http_lrs_inheritence(monkeypatch): - """Test that `LRSHTTPBackend` properly inherits from `AsyncLRSHTTPBackend`.""" - lrs = LRSHTTPBackend() - - # Necessary when using anyio - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - # Test class inheritence - assert issubclass(lrs.__class__, AsyncLRSHTTPBackend) - - # Test "status" - status_mock_response = HTTPBackendStatus.OK - status_mock = AsyncMock(return_value=status_mock_response) - monkeypatch.setattr(AsyncLRSHTTPBackend, "status", status_mock) - assert lrs.status() == status_mock_response - status_mock.assert_awaited() - - # Test "list" - list_exception = NotImplementedError - list_mock = AsyncMock(side_effect=list_exception) - monkeypatch.setattr(AsyncLRSHTTPBackend, "list", list_mock) - with pytest.raises(list_exception): - lrs.list() - - # Test "read" - read_mock_response = [{"hello": "world"}, {"save": "pandas"}] - read_chunk_size = 11 - - async def read_mock(*args, **kwargs): - """Mock a read function.""" - - # For simplicity, check that parameters are passed in function - assert kwargs["chunk_size"] == read_chunk_size - for statement in read_mock_response: - yield statement - - monkeypatch.setattr(AsyncLRSHTTPBackend, "read", read_mock) - assert list(lrs.read(chunk_size=read_chunk_size)) == read_mock_response - - # Test "write" - write_mock_response = 118218 - chunk_size = 17 - write_mock = AsyncMock(return_value=write_mock_response) - monkeypatch.setattr(AsyncLRSHTTPBackend, "write", write_mock) - assert lrs.write(chunk_size=chunk_size) == write_mock_response - write_mock.assert_called_with(chunk_size=chunk_size) - - loop.close() diff --git a/tests/backends/test_loader.py b/tests/backends/test_loader.py index 07c983ea6..62bff3624 100644 --- a/tests/backends/test_loader.py +++ b/tests/backends/test_loader.py @@ -3,16 +3,16 @@ import logging from ralph.backends.data.async_es import AsyncESDataBackend +from ralph.backends.data.async_lrs import AsyncLRSDataBackend from ralph.backends.data.async_mongo import AsyncMongoDataBackend from ralph.backends.data.clickhouse import ClickHouseDataBackend from ralph.backends.data.es import ESDataBackend from ralph.backends.data.fs import FSDataBackend from ralph.backends.data.ldp import LDPDataBackend +from ralph.backends.data.lrs import LRSDataBackend from ralph.backends.data.mongo import MongoDataBackend from ralph.backends.data.s3 import S3DataBackend from ralph.backends.data.swift import SwiftDataBackend -from ralph.backends.http.async_lrs import AsyncLRSHTTPBackend -from ralph.backends.http.lrs import LRSHTTPBackend from ralph.backends.loader import ( get_backends, get_cli_backends, @@ -61,13 +61,13 @@ def test_backends_loader_get_cli_backends(): """Test the `get_cli_backends` function.""" assert get_cli_backends() == { "async_es": AsyncESDataBackend, - "async_lrs": AsyncLRSHTTPBackend, + "async_lrs": AsyncLRSDataBackend, "async_mongo": AsyncMongoDataBackend, "clickhouse": ClickHouseDataBackend, "es": ESDataBackend, "fs": FSDataBackend, "ldp": LDPDataBackend, - "lrs": LRSHTTPBackend, + "lrs": LRSDataBackend, "mongo": MongoDataBackend, "s3": S3DataBackend, "swift": SwiftDataBackend, @@ -79,12 +79,12 @@ def test_backends_loader_get_cli_write_backends(): """Test the `get_cli_write_backends` function.""" assert get_cli_write_backends() == { "async_es": AsyncESDataBackend, - "async_lrs": AsyncLRSHTTPBackend, + "async_lrs": AsyncLRSDataBackend, "async_mongo": AsyncMongoDataBackend, "clickhouse": ClickHouseDataBackend, "es": ESDataBackend, "fs": FSDataBackend, - "lrs": LRSHTTPBackend, + "lrs": LRSDataBackend, "mongo": MongoDataBackend, "s3": S3DataBackend, "swift": SwiftDataBackend, diff --git a/tests/conftest.py b/tests/conftest.py index 6b1050e95..4ae97197c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,10 +27,12 @@ es_forwarding, es_lrs_backend, events, + flavor, fs_backend, fs_lrs_backend, ldp_backend, lrs, + lrs_backend, mongo, mongo_backend, mongo_forwarding, diff --git a/tests/fixtures/backends.py b/tests/fixtures/backends.py index c04f75320..60224670c 100644 --- a/tests/fixtures/backends.py +++ b/tests/fixtures/backends.py @@ -6,9 +6,10 @@ import random import time from contextlib import asynccontextmanager -from functools import lru_cache +from functools import lru_cache, wraps from multiprocessing import Process from pathlib import Path +from typing import Callable, Optional, Union import boto3 import botocore @@ -18,10 +19,12 @@ import websockets from elasticsearch import BadRequestError, Elasticsearch from httpx import AsyncClient, ConnectError +from pydantic import AnyHttpUrl, parse_obj_as from pymongo import MongoClient from pymongo.errors import CollectionInvalid from ralph.backends.data.async_es import AsyncESDataBackend +from ralph.backends.data.async_lrs import AsyncLRSDataBackend from ralph.backends.data.async_mongo import AsyncMongoDataBackend from ralph.backends.data.clickhouse import ( ClickHouseClientOptions, @@ -30,6 +33,7 @@ from ralph.backends.data.es import ESDataBackend from ralph.backends.data.fs import FSDataBackend from ralph.backends.data.ldp import LDPDataBackend +from ralph.backends.data.lrs import LRSDataBackend, LRSHeaders from ralph.backends.data.mongo import MongoDataBackend from ralph.backends.data.s3 import S3DataBackend from ralph.backends.data.swift import SwiftDataBackend @@ -241,6 +245,73 @@ def anyio_backend(): return "asyncio" +@pytest.fixture(params=["sync", "async"]) +def flavor(request): + """Parametrize fixture with `sync`/`async` flavor.""" + return request.param + + +@pytest.mark.anyio +@pytest.fixture +def lrs_backend( + flavor, +) -> Callable[[Optional[str]], Union[LRSDataBackend, AsyncLRSDataBackend]]: + """Return the `get_lrs_test_backend` function.""" + backend_class = LRSDataBackend if flavor == "sync" else AsyncLRSDataBackend + + def make_awaitable(sync_func): + """Make a synchronous callable awaitable.""" + + @wraps(sync_func) + async def async_func(*args, **kwargs): + kwargs.pop("concurrency", None) + return sync_func(*args, **kwargs) + + return async_func + + def make_awaitable_generator(sync_func): + """Make a synchronous generator awaitable.""" + + @wraps(sync_func) + async def async_func(*args, **kwargs): + kwargs.pop("prefetch", None) + for item in sync_func(*args, **kwargs): + yield item + + return async_func + + def _get_lrs_test_backend( + base_url: Optional[str] = "http://fake-lrs.com", + ) -> Union[LRSDataBackend, AsyncLRSDataBackend]: + """Return an (Async)LRSDataBackend backend instance using test defaults.""" + headers = { + "X_EXPERIENCE_API_VERSION": "1.0.3", + "CONTENT_TYPE": "application/json", + } + settings = backend_class.settings_class( + BASE_URL=parse_obj_as(AnyHttpUrl, base_url), + USERNAME="user", + PASSWORD="pass", + HEADERS=LRSHeaders.parse_obj(headers), + LOCALE_ENCODING="utf8", + STATUS_ENDPOINT="/__heartbeat__", + STATEMENTS_ENDPOINT="/xAPI/statements/", + READ_CHUNK_SIZE=500, + WRITE_CHUNK_SIZE=500, + ) + backend = backend_class(settings) + + if isinstance(backend, LRSDataBackend): + backend.status = make_awaitable(backend.status) # type: ignore + backend.read = make_awaitable_generator(backend.read) # type: ignore + backend.write = make_awaitable(backend.write) # type: ignore + backend.close = make_awaitable(backend.close) # type: ignore + + return backend + + return _get_lrs_test_backend + + @pytest.fixture def async_mongo_backend(): """Return the `get_mongo_data_backend` function.""" diff --git a/tests/test_cli.py b/tests/test_cli.py index ae097ec38..c4e6df334 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -882,11 +882,11 @@ def test_cli_write_command_with_fs_backend(fs): assert result.exit_code == 1 assert "file1 already exists and overwrite is not allowed" in result.output - # Try to create the same file with -f + # Try to create the same file with -o update runner = CliRunner() result = runner.invoke( cli, - "write -b fs -t file1 -f --fs-default-directory-path foo".split(), + "write -b fs -t file1 -o update --fs-default-directory-path foo".split(), input=b"other content", ) diff --git a/tests/test_cli_usage.py b/tests/test_cli_usage.py index 6c19651ed..54c269169 100644 --- a/tests/test_cli_usage.py +++ b/tests/test_cli_usage.py @@ -127,10 +127,13 @@ def test_cli_read_command_usage(): " async_lrs backend: \n" " --async-lrs-base-url TEXT\n" " --async-lrs-headers KEY=VALUE,KEY=VALUE\n" + " --async-lrs-locale-encoding TEXT\n" " --async-lrs-password TEXT\n" + " --async-lrs-read-chunk-size INTEGER\n" " --async-lrs-statements-endpoint TEXT\n" " --async-lrs-status-endpoint TEXT\n" " --async-lrs-username TEXT\n" + " --async-lrs-write-chunk-size INTEGER\n" " async_mongo backend: \n" " --async-mongo-client-options KEY=VALUE,KEY=VALUE\n" " --async-mongo-connection-uri MONGODSN\n" @@ -180,10 +183,13 @@ def test_cli_read_command_usage(): " lrs backend: \n" " --lrs-base-url TEXT\n" " --lrs-headers KEY=VALUE,KEY=VALUE\n" + " --lrs-locale-encoding TEXT\n" " --lrs-password TEXT\n" + " --lrs-read-chunk-size INTEGER\n" " --lrs-statements-endpoint TEXT\n" " --lrs-status-endpoint TEXT\n" " --lrs-username TEXT\n" + " --lrs-write-chunk-size INTEGER\n" " mongo backend: \n" " --mongo-client-options KEY=VALUE,KEY=VALUE\n" " --mongo-connection-uri MONGODSN\n" @@ -219,7 +225,7 @@ def test_cli_read_command_usage(): " --swift-write-chunk-size INTEGER\n" " ws backend: \n" " --ws-uri TEXT\n" - " -c, --chunk-size INTEGER Get events by chunks of size #\n" + " -s, --chunk-size INTEGER Get events by chunks of size #\n" " -t, --target TEXT Endpoint from which to read events (e.g.\n" " `/statements`)\n" ' -q, --query \'{"KEY": "VALUE", "KEY": "VALUE"}\'\n' @@ -229,8 +235,8 @@ def test_cli_read_command_usage(): " -i, --ignore_errors BOOLEAN Ignore errors during the encoding operation." "\n" " [default: False]\n" - " --help Show this message and exit." - ) in result.output + " --help Show this message and exit.\n" + ) == result.output logging.warning(result.output) result = runner.invoke(cli, ["read"]) assert result.exit_code > 0 @@ -346,7 +352,7 @@ def test_cli_list_command_usage(): " -n, --new / -a, --all List not fetched (or all) documents\n" " -D, --details / -I, --ids Get documents detailed output (JSON)\n" " --help Show this message and exit.\n" - ) in result.output + ) == result.output result = runner.invoke(cli, ["list"]) assert result.exit_code > 0 @@ -385,10 +391,13 @@ def test_cli_write_command_usage(): " async_lrs backend: \n" " --async-lrs-base-url TEXT\n" " --async-lrs-headers KEY=VALUE,KEY=VALUE\n" + " --async-lrs-locale-encoding TEXT\n" " --async-lrs-password TEXT\n" + " --async-lrs-read-chunk-size INTEGER\n" " --async-lrs-statements-endpoint TEXT\n" " --async-lrs-status-endpoint TEXT\n" " --async-lrs-username TEXT\n" + " --async-lrs-write-chunk-size INTEGER\n" " async_mongo backend: \n" " --async-mongo-client-options KEY=VALUE,KEY=VALUE\n" " --async-mongo-connection-uri MONGODSN\n" @@ -427,10 +436,13 @@ def test_cli_write_command_usage(): " lrs backend: \n" " --lrs-base-url TEXT\n" " --lrs-headers KEY=VALUE,KEY=VALUE\n" + " --lrs-locale-encoding TEXT\n" " --lrs-password TEXT\n" + " --lrs-read-chunk-size INTEGER\n" " --lrs-statements-endpoint TEXT\n" " --lrs-status-endpoint TEXT\n" " --lrs-username TEXT\n" + " --lrs-write-chunk-size INTEGER\n" " mongo backend: \n" " --mongo-client-options KEY=VALUE,KEY=VALUE\n" " --mongo-connection-uri MONGODSN\n" @@ -464,22 +476,18 @@ def test_cli_write_command_usage(): " --swift-username TEXT\n" " --swift-user-domain-name TEXT\n" " --swift-write-chunk-size INTEGER\n" - " -c, --chunk-size INTEGER Get events by chunks of size #\n" - " -f, --force Overwrite existing archives or records\n" + " -t, --target TEXT The target container to write into\n" + " -s, --chunk-size INTEGER Get events by chunks of size #\n" " -I, --ignore-errors Continue writing regardless of raised errors" "\n" - " -s, --simultaneous With HTTP backend, POST all chunks\n" - " simultaneously (instead of sequentially)\n" - " -m, --max-num-simultaneous INTEGER\n" - " The maximum number of chunks to send at once" - ",\n" - " when using `--simultaneous`. Use `-1` to not" - "\n" - " set a limit.\n" - " -t, --target TEXT The target container to write into\n" + " -o, --operation-type OP_TYPE Either index, create, delete, update or " + "append\n" + " -c, --concurrency INTEGER Number of chunks to write concurrently. (" + "async\n" + " backends only)\n" " --help Show this message and exit.\n" ) - assert expected_output in result.output + assert expected_output == result.output result = runner.invoke(cli, ["write"]) assert result.exit_code > 0 diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 88730ced5..1e2adb742 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -7,6 +7,8 @@ import pytest from click.testing import CliRunner +from ralph import cli + def test_dependencies_ralph_command_requires_click(monkeypatch): """Test Click module installation while executing the ralph command.""" @@ -30,16 +32,10 @@ def test_dependencies_ralph_command_requires_click(monkeypatch): def test_dependencies_runserver_subcommand_requires_uvicorn(monkeypatch): """Test Uvicorn module installation while executing the runserver sub command.""" - monkeypatch.setitem(sys.modules, "uvicorn", None) - - # Force ralph.cli reload now that uvicorn is considered as missing - if "ralph.cli" in sys.modules: - del sys.modules["ralph.cli"] - cli = importlib.import_module("ralph.cli") - + monkeypatch.delattr(cli, "uvicorn") + monkeypatch.setattr(cli, "configure_logging", lambda: None) runner = CliRunner() result = runner.invoke(cli.cli, "runserver -b es".split()) - assert isinstance(result.exception, ModuleNotFoundError) assert str(result.exception) == ( "You need to install 'lrs' optional dependencies to use the runserver "