diff --git a/.env.dist b/.env.dist index 80721d719..8a326f539 100644 --- a/.env.dist +++ b/.env.dist @@ -136,13 +136,13 @@ RALPH_BACKENDS__DATA__CLICKHOUSE__TEST_TABLE_NAME=test_xapi_events_all # LRS HTTP backend -RALPH_BACKENDS__HTTP__LRS__BASE_URL=http://ralph:secret@0.0.0.0:8100/ -RALPH_BACKENDS__HTTP__LRS__USERNAME=ralph -RALPH_BACKENDS__HTTP__LRS__PASSWORD=secret -RALPH_BACKENDS__HTTP__LRS__HEADERS__X_EXPERIENCE_API_VERSION=1.0.3 -RALPH_BACKENDS__HTTP__LRS__HEADERS__CONTENT_TYPE=application/json -RALPH_BACKENDS__HTTP__LRS__STATUS_ENDPOINT=/__heartbeat__ -RALPH_BACKENDS__HTTP__LRS__STATEMENTS_ENDPOINT=/xAPI/statements +RALPH_BACKENDS__DATA__LRS__BASE_URL=http://ralph:secret@0.0.0.0:8100/ +RALPH_BACKENDS__DATA__LRS__USERNAME=ralph +RALPH_BACKENDS__DATA__LRS__PASSWORD=secret +RALPH_BACKENDS__DATA__LRS__HEADERS__X_EXPERIENCE_API_VERSION=1.0.3 +RALPH_BACKENDS__DATA__LRS__HEADERS__CONTENT_TYPE=application/json +RALPH_BACKENDS__DATA__LRS__STATUS_ENDPOINT=/__heartbeat__ +RALPH_BACKENDS__DATA__LRS__STATEMENTS_ENDPOINT=/xAPI/statements # Sentry diff --git a/docs/backends/index.md b/docs/backends/index.md index 27f615241..0fabc5a4a 100644 --- a/docs/backends/index.md +++ b/docs/backends/index.md @@ -28,7 +28,7 @@ ralph list --backend swift # [...] more options The general patterns for backend parameters are: - `--{{ backend_name }}-{{ parameter | underscore_to_dash }}` for command options, and, -- `RALPH_BACKENDS__{{ backend_type | uppercase }}__{{ backend_name | uppercase }}__{{ parameter | uppercase }}` for environment variables, where the `backend_type` is one of `DATA`, `LRS` and `STREAM`. +- `RALPH_BACKENDS__DATA__{{ backend_name | uppercase }}__{{ parameter | uppercase }}` for environment variables. ## Elasticsearch @@ -69,6 +69,11 @@ documents from it. members: - attributes +The ClickHouse client options supported in Ralph can be found in these locations: + +- [Python driver specific](https://clickhouse.com/docs/en/integrations/language-clients/python/driver-api#settings-argument) +- [General ClickHouse client settings](https://clickhouse.com/docs/en/operations/settings/settings/) + ## OVH - Log Data Platform (LDP) LDP is a nice service built by OVH on top of Graylog to follow, analyse and @@ -150,23 +155,12 @@ The only required parameter is the `path` we want to list or stream content from members: - attributes -The ClickHouse client options supported in Ralph can be found in these locations: - -- [Python driver specific](https://clickhouse.com/docs/en/integrations/language-clients/python/driver-api#settings-argument) -- [General ClickHouse client settings](https://clickhouse.com/docs/en/operations/settings/settings/) - -## Learning Record Store (LRS) - HTTP backend interface - -!!! warning - - `HTTP` backend type will soon be merged into the `Data` backend type. - - PR [#521](https://github.com/openfun/ralph/pull/521) prepares `Data` backend alignments with `HTTP` backend `read` method signature +## Learning Record Store (LRS) The LRS backend is used to store and retrieve xAPI statements from various systems that follow the [xAPI specification](https://github.com/adlnet/xAPI-Spec/tree/master) (such as our own Ralph LRS, which can be run from this package). LRS systems are mostly used in e-learning infrastructures. -### ::: ralph.backends.http.async_lrs.LRSHTTPBackendSettings +### ::: ralph.backends.data.lrs.LRSDataBackendSettings handler: python options: show_root_heading: false diff --git a/src/ralph/backends/data/async_lrs.py b/src/ralph/backends/data/async_lrs.py new file mode 100644 index 000000000..8c8102144 --- /dev/null +++ b/src/ralph/backends/data/async_lrs.py @@ -0,0 +1,295 @@ +"""Async LRS data backend for Ralph.""" + +from io import IOBase +from typing import AsyncIterator, Iterable, Optional, Union +from urllib.parse import ParseResult, parse_qs, urljoin, urlparse + +from httpx import AsyncClient, HTTPError, HTTPStatusError, RequestError +from pydantic import AnyHttpUrl, PositiveInt, parse_obj_as + +from ralph.backends.data.base import ( + AsyncWritable, + BaseAsyncDataBackend, + BaseOperationType, + DataBackendStatus, + Loggable, +) +from ralph.backends.data.lrs import LRSDataBackendSettings, StatementResponse +from ralph.backends.lrs.base import LRSStatementsQuery +from ralph.exceptions import BackendException +from ralph.utils import async_parse_dict_to_bytes, iter_by_batch, parse_iterable_to_dict + + +class AsyncLRSDataBackend( + BaseAsyncDataBackend[LRSDataBackendSettings, LRSStatementsQuery], + AsyncWritable, + Loggable, +): + """Asynchronous LRS data backend.""" + + name = "async_lrs" + default_operation_type = BaseOperationType.CREATE + unsupported_operation_types = { + BaseOperationType.APPEND, + BaseOperationType.UPDATE, + BaseOperationType.DELETE, + } + + def __init__(self, settings: Optional[LRSDataBackendSettings] = None) -> None: + """Instantiate the LRS HTTP (basic auth) backend client. + + Args: + settings (LRSDataBackendSettings or None): The LRS data backend settings. + If `settings` is `None`, a default settings instance is used instead. + """ + super().__init__(settings) + self.base_url = parse_obj_as(AnyHttpUrl, self.settings.BASE_URL) + self.auth = (self.settings.USERNAME, self.settings.PASSWORD) + self._client = None + + @property + def client(self) -> AsyncClient: + """Create a `httpx.AsyncClient` if it doesn't exist.""" + if not self._client: + headers = self.settings.HEADERS.dict(by_alias=True) + self._client = AsyncClient(auth=self.auth, headers=headers) + return self._client + + async def status(self) -> DataBackendStatus: + """HTTP backend check for server status.""" + status_url = urljoin(self.base_url, self.settings.STATUS_ENDPOINT) + try: + response = await self.client.get(status_url) + response.raise_for_status() + except RequestError: + self.logger.error("Unable to request the server") + return DataBackendStatus.AWAY + except HTTPStatusError: + self.logger.error("Response raised an HTTP status of 4xx or 5xx") + return DataBackendStatus.ERROR + + return DataBackendStatus.OK + + async def read( # noqa: PLR0913 + self, + query: Optional[Union[str, LRSStatementsQuery]] = None, + target: Optional[str] = None, + chunk_size: Optional[int] = None, + raw_output: bool = False, + ignore_errors: bool = False, + prefetch: Optional[PositiveInt] = None, + max_statements: Optional[PositiveInt] = None, + ) -> Union[AsyncIterator[bytes], AsyncIterator[dict]]: + """Get statements from LRS `target` endpoint. + + The `read` method defined in the LRS specification returns `statements` array + and `more` IRL. The latter is required when the returned `statement` list has + been limited. + + Args: + query (str, LRSStatementsQuery): The query to select records to read. + target (str): Endpoint from which to read data (e.g. `/statements`). + If target is `None`, `/xAPI/statements` default endpoint is used. + chunk_size (int or None): The number of records or bytes to read in one + batch, depending on whether the records are dictionaries or bytes. + raw_output (bool): Controls whether to yield bytes or dictionaries. + If the records are dictionaries and `raw_output` is set to `True`, they + are encoded as JSON. + If the records are bytes and `raw_output` is set to `False`, they are + decoded as JSON by line. + ignore_errors (bool): If `True`, errors during the read operation + are ignored and logged. If `False` (default), a `BackendException` + is raised if an error occurs. + prefetch (int): The number of records to prefetch (queue) while yielding. + If `prefetch` is `None` it defaults to `1` - no records are prefetched. + max_statements: The maximum number of statements to yield. + """ + statements = super().read( + query, + target, + chunk_size, + raw_output, + ignore_errors, + prefetch, + max_statements, + ) + async for statement in statements: + yield statement + + async def _read_bytes( + self, + query: LRSStatementsQuery, + target: Optional[str], + chunk_size: int, + ignore_errors: bool, + ) -> AsyncIterator[bytes]: + """Method called by `self.read` yielding bytes. See `self.read`.""" + statements = self._read_dicts(query, target, chunk_size, ignore_errors) + async for statement in async_parse_dict_to_bytes( + statements, self.settings.LOCALE_ENCODING, ignore_errors, self.logger + ): + yield statement + + async def _read_dicts( + self, + query: LRSStatementsQuery, + target: Optional[str], + chunk_size: int, + ignore_errors: bool, # noqa: ARG002 + ) -> AsyncIterator[dict]: + """Method called by `self.read` yielding dictionaries. See `self.read`.""" + if not target: + target = self.settings.STATEMENTS_ENDPOINT + + if query.limit: + self.logger.warning( + "The limit query parameter value is overwritten by the chunk_size " + "parameter value." + ) + + query.limit = chunk_size + + # Create request URL + target = ParseResult( + scheme=urlparse(self.base_url).scheme, + netloc=urlparse(self.base_url).netloc, + path=target, + query="", + params="", + fragment="", + ).geturl() + + statements = self._fetch_statements( + target=target, + query_params=query.dict(exclude_none=True, exclude_unset=True), + ) + + # Iterate through results + try: + async for statement in statements: + yield statement + except HTTPError as error: + msg = "Failed to fetch statements: %s" + self.logger.error(msg, error) + raise BackendException(msg % (error,)) from error + + async def write( # noqa: PLR0913 + self, + data: Union[IOBase, Iterable[bytes], Iterable[dict]], + target: Optional[str] = None, + chunk_size: Optional[int] = None, + ignore_errors: bool = False, + operation_type: Optional[BaseOperationType] = None, + concurrency: Optional[int] = None, + ) -> int: + """Write `data` records to the `target` endpoint and return their count. + + Args: + data: (Iterable): The data to write. + target (str or None): Endpoint in which to write data (e.g. `/statements`). + If `target` is `None`, `/xAPI/statements` default endpoint is used. + chunk_size (int or None): The number of records or bytes to write in one + batch, depending on whether `data` contains dictionaries or bytes. + If `chunk_size` is `None`, a default value is used instead. + ignore_errors (bool): If `True`, errors during the write operation + are ignored and logged. If `False` (default), a `BackendException` + is raised if an error occurs. + operation_type (BaseOperationType or None): The mode of the write operation. + If `operation_type` is `None`, the `default_operation_type` is used + instead. See `BaseOperationType`. + concurrency (int): The number of chunks to write concurrently. + If `None` it defaults to `1`. + """ + return await super().write( + data, target, chunk_size, ignore_errors, operation_type, concurrency + ) + + async def _write_bytes( # noqa: PLR0913 + self, + data: Iterable[bytes], + target: Optional[str], + chunk_size: int, + ignore_errors: bool, + operation_type: BaseOperationType, + ) -> int: + """Method called by `self.write` writing bytes. See `self.write`.""" + statements = parse_iterable_to_dict(data, ignore_errors, self.logger) + return await self._write_dicts( + statements, target, chunk_size, ignore_errors, operation_type + ) + + async def _write_dicts( # noqa: PLR0913 + self, + data: Iterable[dict], + target: Optional[str], + chunk_size: int, + ignore_errors: bool, + operation_type: BaseOperationType, # noqa: ARG002 + ) -> int: + """Method called by `self.write` writing dictionaries. See `self.write`.""" + if not target: + target = self.settings.STATEMENTS_ENDPOINT + + target = ParseResult( + scheme=urlparse(self.base_url).scheme, + netloc=urlparse(self.base_url).netloc, + path=target, + query="", + params="", + fragment="", + ).geturl() + + self.logger.debug( + "Start writing to the %s endpoint (chunk size: %s)", target, chunk_size + ) + + count = 0 + for chunk in iter_by_batch(data, chunk_size): + count += await self._post_and_raise_for_status(target, chunk, ignore_errors) + + self.logger.debug("Posted %d statements", count) + return count + + async def close(self) -> None: + """Close the `httpx.AsyncClient`. + + Raise: + BackendException: If a failure occurs during the close operation. + """ + if not self._client: + self.logger.warning("No backend client to close.") + return + + await self.client.aclose() + + async def _fetch_statements(self, target, query_params: dict): + """Fetch statements from a LRS.""" + while True: + response = await self.client.get(target, params=query_params) + response.raise_for_status() + statements_response = StatementResponse(**response.json()) + statements = statements_response.statements + if isinstance(statements, dict): + statements = [statements] + + for statement in statements: + yield statement + + if not statements_response.more: + break + + query_params.update(parse_qs(urlparse(statements_response.more).query)) + + async def _post_and_raise_for_status(self, target, chunk, ignore_errors): + """POST chunk of statements to `target` and return the number of insertions.""" + try: + request = await self.client.post(target, json=chunk) + request.raise_for_status() + return len(chunk) + except HTTPError as error: + msg = "Failed to post statements: %s" + if ignore_errors: + self.logger.warning(msg, error) + return 0 + self.logger.error(msg, error) + raise BackendException(msg % (error,)) from error diff --git a/src/ralph/backends/data/fs.py b/src/ralph/backends/data/fs.py index 228ec0e5f..f8db75979 100644 --- a/src/ralph/backends/data/fs.py +++ b/src/ralph/backends/data/fs.py @@ -321,8 +321,8 @@ def _write_bytes( # noqa: PLR0913 msg = "Target file not specified; using random file name: %s" self.logger.info(msg, target) - target = Path(target) - path = target if target.is_absolute() else self.default_directory / target + path = Path(target) + path = path if path.is_absolute() else self.default_directory / path if operation_type in [BaseOperationType.CREATE, BaseOperationType.INDEX]: if path.is_file(): @@ -363,6 +363,7 @@ def _write_bytes( # noqa: PLR0913 "timestamp": now(), } ) + self.logger.debug("Written %s with success", path.absolute()) return 1 def close(self) -> None: diff --git a/src/ralph/backends/data/lrs.py b/src/ralph/backends/data/lrs.py new file mode 100644 index 000000000..479321692 --- /dev/null +++ b/src/ralph/backends/data/lrs.py @@ -0,0 +1,319 @@ +"""LRS data backend for Ralph.""" + +from io import IOBase +from typing import Iterable, Iterator, List, Optional, Union +from urllib.parse import ParseResult, parse_qs, urljoin, urlparse + +from httpx import Client, HTTPError, HTTPStatusError, RequestError +from pydantic import AnyHttpUrl, BaseModel, Field, PositiveInt, parse_obj_as + +from ralph.backends.data.base import ( + BaseDataBackend, + BaseDataBackendSettings, + BaseOperationType, + DataBackendStatus, + Loggable, + Writable, +) +from ralph.backends.lrs.base import LRSStatementsQuery +from ralph.conf import BaseSettingsConfig, HeadersParameters +from ralph.exceptions import BackendException +from ralph.utils import iter_by_batch, parse_dict_to_bytes, parse_iterable_to_dict + + +class LRSHeaders(HeadersParameters): + """Pydantic model for LRS headers.""" + + X_EXPERIENCE_API_VERSION: str = Field("1.0.3", alias="X-Experience-API-Version") + CONTENT_TYPE: str = Field("application/json", alias="content-type") + + +class LRSDataBackendSettings(BaseDataBackendSettings): + """LRS data backend default configuration. + + Attributes: + BASE_URL (AnyHttpUrl): LRS server URL. + USERNAME (str): Basic auth username for LRS authentication. + PASSWORD (str): Basic auth password for LRS authentication. + HEADERS (dict): Headers defined for the LRS server connection. + LOCALE_ENCODING (str): The encoding used for reading statements. + READ_CHUNK_SIZE (int): The default chunk size for reading statements. + STATUS_ENDPOINT (str): Endpoint used to check server status. + STATEMENTS_ENDPOINT (str): Default endpoint for LRS statements resource. + WRITE_CHUNK_SIZE (int): The default chunk size for writing statements. + """ + + class Config(BaseSettingsConfig): + """Pydantic Configuration.""" + + env_prefix = "RALPH_BACKENDS__DATA__LRS__" + + BASE_URL: AnyHttpUrl = Field("http://0.0.0.0:8100") + USERNAME: str = "ralph" + PASSWORD: str = "secret" + HEADERS: LRSHeaders = LRSHeaders() + STATUS_ENDPOINT: str = "/__heartbeat__" + STATEMENTS_ENDPOINT: str = "/xAPI/statements" + + +class StatementResponse(BaseModel): + """Pydantic model for `get` statements response.""" + + statements: Union[List[dict], dict] + more: Optional[str] + + +class LRSDataBackend( + BaseDataBackend[LRSDataBackendSettings, LRSStatementsQuery], Writable, Loggable +): + """LRS data backend.""" + + name = "lrs" + default_operation_type = BaseOperationType.CREATE + unsupported_operation_types = { + BaseOperationType.APPEND, + BaseOperationType.UPDATE, + BaseOperationType.DELETE, + } + + def __init__(self, settings: Optional[LRSDataBackendSettings] = None) -> None: + """Instantiate the LRS HTTP (basic auth) backend client. + + Args: + settings (LRSDataBackendSettings or None): The LRS data backend settings. + If `settings` is `None`, a default settings instance is used instead. + """ + super().__init__(settings) + self.base_url = parse_obj_as(AnyHttpUrl, self.settings.BASE_URL) + self.auth = (self.settings.USERNAME, self.settings.PASSWORD) + self._client = None + + @property + def client(self) -> Client: + """Create a `httpx.Client` if it doesn't exist.""" + if not self._client: + headers = self.settings.HEADERS.dict(by_alias=True) + self._client = Client(auth=self.auth, headers=headers) + return self._client + + def status(self) -> DataBackendStatus: + """HTTP backend check for server status.""" + status_url = urljoin(self.base_url, self.settings.STATUS_ENDPOINT) + try: + response = self.client.get(status_url) + response.raise_for_status() + except RequestError: + self.logger.error("Unable to request the server") + return DataBackendStatus.AWAY + except HTTPStatusError: + self.logger.error("Response raised an HTTP status of 4xx or 5xx") + return DataBackendStatus.ERROR + + return DataBackendStatus.OK + + def read( # noqa: PLR0913 + self, + query: Optional[Union[str, LRSStatementsQuery]] = None, + target: Optional[str] = None, + chunk_size: Optional[int] = None, + raw_output: bool = False, + ignore_errors: bool = False, + max_statements: Optional[PositiveInt] = None, + ) -> Union[Iterator[bytes], Iterator[dict]]: + """Get statements from LRS `target` endpoint. + + The `read` method defined in the LRS specification returns `statements` array + and `more` IRL. The latter is required when the returned `statement` list has + been limited. + + Args: + query (str, LRSStatementsQuery): The query to select records to read. + target (str): Endpoint from which to read data (e.g. `/statements`). + If target is `None`, `/xAPI/statements` default endpoint is used. + chunk_size (int or None): The number of records or bytes to read in one + batch, depending on whether the records are dictionaries or bytes. + raw_output (bool): Controls whether to yield bytes or dictionaries. + If the records are dictionaries and `raw_output` is set to `True`, they + are encoded as JSON. + If the records are bytes and `raw_output` is set to `False`, they are + decoded as JSON by line. + ignore_errors (bool): If `True`, errors during the read operation + are ignored and logged. If `False` (default), a `BackendException` + is raised if an error occurs. + max_statements: The maximum number of statements to yield. + """ + yield from super().read( + query, target, chunk_size, raw_output, ignore_errors, max_statements + ) + + def _read_bytes( + self, + query: LRSStatementsQuery, + target: Optional[str], + chunk_size: int, + ignore_errors: bool, + ) -> Iterator[bytes]: + """Method called by `self.read` yielding bytes. See `self.read`.""" + statements = self._read_dicts(query, target, chunk_size, ignore_errors) + yield from parse_dict_to_bytes( + statements, self.settings.LOCALE_ENCODING, ignore_errors, self.logger + ) + + def _read_dicts( + self, + query: LRSStatementsQuery, + target: Optional[str], + chunk_size: int, + ignore_errors: bool, # noqa: ARG002 + ) -> Iterator[dict]: + """Method called by `self.read` yielding dictionaries. See `self.read`.""" + if not target: + target = self.settings.STATEMENTS_ENDPOINT + + if query.limit: + self.logger.warning( + "The limit query parameter value is overwritten by the chunk_size " + "parameter value." + ) + + query.limit = chunk_size + + # Create request URL + target = ParseResult( + scheme=urlparse(self.base_url).scheme, + netloc=urlparse(self.base_url).netloc, + path=target, + query="", + params="", + fragment="", + ).geturl() + + statements = self._fetch_statements( + target=target, + query_params=query.dict(exclude_none=True, exclude_unset=True), + ) + + # Iterate through results + try: + for statement in statements: + yield statement + except HTTPError as error: + msg = "Failed to fetch statements: %s" + self.logger.error(msg, error) + raise BackendException(msg % (error,)) from error + + def write( # noqa: PLR0913 + self, + data: Union[IOBase, Iterable[bytes], Iterable[dict]], + target: Optional[str] = None, + chunk_size: Optional[int] = None, + ignore_errors: bool = False, + operation_type: Optional[BaseOperationType] = None, + ) -> int: + """Write `data` records to the `target` endpoint and return their count. + + Args: + data: (Iterable): The data to write. + target (str or None): Endpoint in which to write data (e.g. `/statements`). + If `target` is `None`, `/xAPI/statements` default endpoint is used. + chunk_size (int or None): The number of records or bytes to write in one + batch, depending on whether `data` contains dictionaries or bytes. + If `chunk_size` is `None`, a default value is used instead. + ignore_errors (bool): If `True`, errors during the write operation + are ignored and logged. If `False` (default), a `BackendException` + is raised if an error occurs. + operation_type (BaseOperationType or None): The mode of the write operation. + If `operation_type` is `None`, the `default_operation_type` is used + instead. See `BaseOperationType`. + """ + return super().write(data, target, chunk_size, ignore_errors, operation_type) + + def _write_bytes( # noqa: PLR0913 + self, + data: Iterable[bytes], + target: Optional[str], + chunk_size: int, + ignore_errors: bool, + operation_type: BaseOperationType, + ) -> int: + """Method called by `self.write` writing bytes. See `self.write`.""" + statements = parse_iterable_to_dict(data, ignore_errors, self.logger) + return self._write_dicts( + statements, target, chunk_size, ignore_errors, operation_type + ) + + def _write_dicts( # noqa: PLR0913 + self, + data: Iterable[dict], + target: Optional[str], + chunk_size: int, + ignore_errors: bool, + operation_type: BaseOperationType, # noqa: ARG002 + ) -> int: + """Method called by `self.write` writing dictionaries. See `self.write`.""" + if not target: + target = self.settings.STATEMENTS_ENDPOINT + + target = ParseResult( + scheme=urlparse(self.base_url).scheme, + netloc=urlparse(self.base_url).netloc, + path=target, + query="", + params="", + fragment="", + ).geturl() + + self.logger.debug( + "Start writing to the %s endpoint (chunk size: %s)", target, chunk_size + ) + + count = 0 + for chunk in iter_by_batch(data, chunk_size): + count += self._post_and_raise_for_status(target, chunk, ignore_errors) + + self.logger.debug("Posted %d statements", count) + return count + + def close(self) -> None: + """Close the `httpx.Client`. + + Raise: + BackendException: If a failure occurs during the close operation. + """ + if not self._client: + self.logger.warning("No backend client to close.") + return + + self.client.close() + + def _fetch_statements(self, target, query_params: dict): + """Fetch statements from a LRS.""" + while True: + response = self.client.get(target, params=query_params) + response.raise_for_status() + statements_response = StatementResponse(**response.json()) + statements = statements_response.statements + if isinstance(statements, dict): + statements = [statements] + + for statement in statements: + yield statement + + if not statements_response.more: + break + + query_params.update(parse_qs(urlparse(statements_response.more).query)) + + def _post_and_raise_for_status(self, target, chunk, ignore_errors): + """POST chunk of statements to `target` and return the number of insertions.""" + try: + request = self.client.post(target, content=chunk) + request.raise_for_status() + return len(chunk) + except HTTPError as error: + msg = "Failed to post statements: %s" + if ignore_errors: + self.logger.warning(msg, error) + return 0 + self.logger.error(msg, error) + raise BackendException(msg % (error,)) from error diff --git a/src/ralph/backends/http/async_lrs.py b/src/ralph/backends/http/async_lrs.py deleted file mode 100644 index b94e7ad34..000000000 --- a/src/ralph/backends/http/async_lrs.py +++ /dev/null @@ -1,392 +0,0 @@ -"""Async LRS HTTP backend for Ralph.""" - -import asyncio -import json -import logging -from itertools import chain -from typing import Iterable, Iterator, List, Optional, Union -from urllib.parse import ParseResult, parse_qs, urljoin, urlparse - -from httpx import AsyncClient, HTTPError, HTTPStatusError, RequestError -from pydantic import AnyHttpUrl, BaseModel, Field, parse_obj_as -from pydantic.types import PositiveInt - -from ralph.backends.lrs.base import LRSStatementsQuery -from ralph.conf import BaseSettingsConfig, HeadersParameters -from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import gather_with_limited_concurrency, iter_by_batch - -from .base import ( - BaseHTTPBackend, - BaseHTTPBackendSettings, - HTTPBackendStatus, - OperationType, - enforce_query_checks, -) - -logger = logging.getLogger(__name__) - - -class LRSHeaders(HeadersParameters): - """Pydantic model for LRS headers.""" - - X_EXPERIENCE_API_VERSION: str = Field("1.0.3", alias="X-Experience-API-Version") - CONTENT_TYPE: str = Field("application/json", alias="content-type") - - -class LRSHTTPBackendSettings(BaseHTTPBackendSettings): - """LRS HTTP backend default configuration. - - Attributes: - BASE_URL (AnyHttpUrl): LRS server URL. - USERNAME (str): Basic auth username for LRS authentication. - PASSWORD (str): Basic auth password for LRS authentication. - HEADERS (dict): Headers defined for the LRS server connection. - STATUS_ENDPOINT (str): Endpoint used to check server status. - STATEMENTS_ENDPOINT (str): Default endpoint for LRS statements resource. - """ - - class Config(BaseSettingsConfig): - """Pydantic Configuration.""" - - env_prefix = "RALPH_BACKENDS__HTTP__LRS__" - - BASE_URL: AnyHttpUrl = Field("http://0.0.0.0:8100") - USERNAME: str = "ralph" - PASSWORD: str = "secret" - HEADERS: LRSHeaders = LRSHeaders() - STATUS_ENDPOINT: str = "/__heartbeat__" - STATEMENTS_ENDPOINT: str = "/xAPI/statements" - - -class StatementResponse(BaseModel): - """Pydantic model for `get` statements response.""" - - statements: Union[List[dict], dict] - more: Optional[str] - - -class AsyncLRSHTTPBackend(BaseHTTPBackend): - """Asynchronous LRS HTTP backend.""" - - name = "async_lrs" - query = LRSStatementsQuery - default_operation_type = OperationType.CREATE - settings_class = LRSHTTPBackendSettings - - def __init__(self, settings: Optional[LRSHTTPBackendSettings] = None): - """Instantiate the LRS HTTP (basic auth) backend client. - - Args: - settings (LRSHTTPBackendSettings or None): The LRS HTTP backend settings. - If `settings` is `None`, a default settings instance is used instead. - """ - self.settings = settings if settings else self.settings_class() - - self.base_url = parse_obj_as(AnyHttpUrl, self.settings.BASE_URL) - self.auth = (self.settings.USERNAME, self.settings.PASSWORD) - - async def status(self) -> HTTPBackendStatus: - """HTTP backend check for server status.""" - status_url = urljoin(self.base_url, self.settings.STATUS_ENDPOINT) - - try: - async with AsyncClient() as client: - response = await client.get(status_url) - response.raise_for_status() - except RequestError: - msg = "Unable to request the server" - logger.error(msg) - return HTTPBackendStatus.AWAY - except HTTPStatusError: - msg = "Response raised an HTTP status of 4xx or 5xx" - logger.error(msg) - return HTTPBackendStatus.ERROR - - return HTTPBackendStatus.OK - - async def list( - self, - target: Optional[str] = None, - details: bool = False, # noqa: ARG002 - new: bool = False, # noqa: ARG002 - ) -> Iterator[Union[str, dict]]: - """Raise error for unsupported `list` method.""" - msg = "LRS HTTP backend does not support `list` method, cannot list from %s" - - logger.error(msg, target) - raise NotImplementedError(msg % target) - - @enforce_query_checks - async def read( # noqa: PLR0913 - self, - query: Optional[Union[str, LRSStatementsQuery]] = None, - target: Optional[str] = None, - chunk_size: Optional[PositiveInt] = 500, - raw_output: bool = False, - ignore_errors: bool = False, # noqa: ARG002 - greedy: bool = True, - max_statements: Optional[PositiveInt] = None, - ) -> Iterator[Union[bytes, dict]]: - """Get statements from LRS `target` endpoint. - - The `read` method defined in the LRS specification returns `statements` array - and `more` IRL. The latter is required when the returned `statement` list has - been limited. - - Args: - query (str, LRSStatementsQuery): The query to select records to read. - target (str): Endpoint from which to read data (e.g. `/statements`). - If target is `None`, `/xAPI/statements` default endpoint is used. - chunk_size (int or None): The number of records or bytes to read in one - batch, depending on whether the records are dictionaries or bytes. - raw_output (bool): Controls whether to yield bytes or dictionaries. - If the records are dictionaries and `raw_output` is set to `True`, they - are encoded as JSON. - If the records are bytes and `raw_output` is set to `False`, they are - decoded as JSON by line. - ignore_errors (bool): If `True`, errors during the read operation - are ignored and logged. If `False` (default), a `BackendException` - is raised if an error occurs. - greedy: If set to True, the client will fetch all available pages even - before the statements yielded by the generator are consumed. Caution: - this might potentially lead to large amounts of API calls and to the - memory filling up. - max_statements: The maximum number of statements to yield. - """ - if not target: - target = self.settings.STATEMENTS_ENDPOINT - - if query and query.limit: - logger.warning( - "The limit query parameter value is overwritten by the chunk_size " - "parameter value." - ) - - if query is None: - query = LRSStatementsQuery() - - query.limit = chunk_size - - # Create request URL - target = ParseResult( - scheme=urlparse(self.base_url).scheme, - netloc=urlparse(self.base_url).netloc, - path=target, - query="", - params="", - fragment="", - ).geturl() - - # Select the appropriate fetch function - if greedy: - statements_async_generator = self._greedy_fetch_statements( - target, raw_output, query.dict(exclude_none=True, exclude_unset=True) - ) - else: - statements_async_generator = self._fetch_statements( - target=target, - raw_output=raw_output, - query_params=query.dict(exclude_none=True, exclude_unset=True), - ) - - # Iterate through results - counter = 0 - try: - async for statement in statements_async_generator: - if max_statements and (counter >= max_statements): - break - yield statement - counter += 1 - except HTTPError as error: - msg = "Failed to fetch statements." - logger.error("%s. %s", msg, error) - raise BackendException(msg, *error.args) from error - - async def write( # noqa: PLR0913 - self, - data: Union[Iterable[bytes], Iterable[dict]], - target: Optional[str] = None, - chunk_size: Optional[PositiveInt] = 500, - ignore_errors: bool = False, - operation_type: Optional[OperationType] = None, - simultaneous: bool = False, - max_num_simultaneous: Optional[PositiveInt] = None, - ) -> int: - """Write `data` records to the `target` endpoint and return their count. - - Args: - data: (Iterable): The data to write. - target (str or None): Endpoint in which to write data (e.g. `/statements`). - If `target` is `None`, `/xAPI/statements` default endpoint is used. - chunk_size (int or None): The number of records or bytes to write in one - batch, depending on whether `data` contains dictionaries or bytes. - If `chunk_size` is `None`, a default value is used instead. - ignore_errors (bool): If `True`, errors during the write operation - are ignored and logged. If `False` (default), a `BackendException` - is raised if an error occurs. - operation_type (OperationType or None): The mode of the write operation. - If `operation_type` is `None`, the `default_operation_type` is used - instead. See `OperationType`. - simultaneous (bool): If `True`, chunks requests will be made concurrently. - If `False` (default), chunks will be sent sequentially - max_num_simultaneous (int or None): If simultaneous is `True`, the maximum - number of chunks to POST concurrently. If `None` (default), no limit is - set. - """ - data = iter(data) - try: - first_record = next(data) - except StopIteration: - logger.info("Data Iterator is empty; skipping write to target") - return 0 - - if not operation_type: - operation_type = self.default_operation_type - - if operation_type in ( - OperationType.APPEND, - OperationType.UPDATE, - OperationType.DELETE, - ): - msg = f"{operation_type.value} operation type is not supported." - logger.error(msg) - raise BackendParameterException(msg) - - if not target: - target = self.settings.STATEMENTS_ENDPOINT - - if not chunk_size: - chunk_size = 500 - - target = ParseResult( - scheme=urlparse(self.base_url).scheme, - netloc=urlparse(self.base_url).netloc, - path=target, - query="", - params="", - fragment="", - ).geturl() - - if (max_num_simultaneous is not None) and max_num_simultaneous < 1: - msg = "max_num_simultaneous must be a strictly positive integer" - logger.error(msg) - raise BackendParameterException(msg) - - if (not simultaneous) and (max_num_simultaneous is not None): - msg = "max_num_simultaneous can only be used with `simultaneous=True`" - logger.error(msg) - raise BackendParameterException(msg) - - # Gather only one chunk at a time when not using the simultaneous option - if not simultaneous: - max_num_simultaneous = 1 - - data = chain((first_record,), data) - - logger.debug( - "Start writing to the %s endpoint (chunk size: %s)", target, chunk_size - ) - - # Create tasks - tasks = set() - for chunk in iter_by_batch(data, chunk_size): - tasks.add(self._post_and_raise_for_status(target, chunk, ignore_errors)) - - # Run POST tasks - result = await gather_with_limited_concurrency(max_num_simultaneous, *tasks) - statements_count = sum(result) - - logger.debug("Posted %d statements", statements_count) - return statements_count - - async def _fetch_statements(self, target, raw_output, query_params: dict): - """Fetch statements from a LRS. Used in `read`.""" - async with AsyncClient( - auth=self.auth, headers=self.settings.HEADERS.dict(by_alias=True) - ) as client: - while True: - response = await client.get(target, params=query_params) - response.raise_for_status() - statements_response = StatementResponse.parse_obj(response.json()) - statements = statements_response.statements - statements = ( - [statements] if not isinstance(statements, list) else statements - ) - if raw_output: - for statement in statements: - yield bytes(json.dumps(statement), encoding="utf-8") - else: - for statement in statements: - yield statement - - if not statements_response.more: - break - - query_params.update(parse_qs(urlparse(statements_response.more).query)) - - async def _greedy_fetch_statements(self, target, raw_output, query_params: dict): - """Fetch as many statements as possible and yield when they are available. - - This may be used in the context of paginated results, to allow processing - of statements while the following page is being fetched. Implementation of - this function relies on the class method `_fetch_statements`, which must yield - statements. - """ - queue = asyncio.Queue() - - async def fetch_all_statements(queue): - """Fetch all statements and put them in a queue.""" - try: - async for statement in self._fetch_statements( - target=target, raw_output=raw_output, query_params=query_params - ): - await queue.put(statement) - # Re-raising exceptions is necessary as create_task fails silently - except Exception as exception: - # None signals that the queue is done - await queue.put(None) - raise exception - await queue.put(None) - - # Run fetch_all_statements in the background - task = asyncio.create_task(fetch_all_statements(queue)) - - # Yield statements as they arrive - while True: - statement = await queue.get() - if statement is None: - # Check for exception in fetch_all_statements - if task.exception(): - raise task.exception() - break - yield statement - - async def _post_and_raise_for_status(self, target, chunk, ignore_errors): - """POST chunk of statements to `target` and return the number of insertions. - - For use in `write`. - """ - async with AsyncClient(auth=self.auth, headers=self.settings.HEADERS) as client: - try: - request = await client.post( - # Encode data to allow async post - target, - content=json.dumps(list(chunk)).encode("utf-8"), - ) - except TypeError as error: - msg = f"Failed to encode JSON: {error}, for document {chunk}" - logger.error(msg) - if ignore_errors: - return 0 - raise BackendException(msg) from error - - try: - request.raise_for_status() - return len(chunk) - except HTTPError as error: - msg = "Failed to post statements" - logger.error("%s. %s", msg, error) - if ignore_errors: - return 0 - raise BackendException(msg, *error.args) from error diff --git a/src/ralph/backends/http/base.py b/src/ralph/backends/http/base.py deleted file mode 100644 index 4f3b212ec..000000000 --- a/src/ralph/backends/http/base.py +++ /dev/null @@ -1,151 +0,0 @@ -"""Base HTTP backend for Ralph.""" - -import functools -import logging -from abc import ABC, abstractmethod -from enum import Enum, unique -from typing import Iterator, List, Optional, Union - -from pydantic import BaseModel, BaseSettings, ValidationError -from pydantic.types import PositiveInt - -from ralph.conf import BaseSettingsConfig, core_settings -from ralph.exceptions import BackendParameterException - -logger = logging.getLogger(__name__) - - -class BaseHTTPBackendSettings(BaseSettings): - """Data backend default configuration.""" - - class Config(BaseSettingsConfig): - """Pydantic Configuration.""" - - env_prefix = "RALPH_BACKENDS__HTTP__" - env_file = ".env" - env_file_encoding = core_settings.LOCALE_ENCODING - - -@unique -class HTTPBackendStatus(Enum): - """HTTP backend statuses.""" - - OK = "ok" - AWAY = "away" - ERROR = "error" - - -class OperationType(Enum): - """Data backend operation types. - - Attributes: - INDEX (str): creates a new record with a specific ID. - CREATE (str): creates a new record without a specific ID. - DELETE (str): deletes an existing record. - UPDATE (str): updates or overwrites an existing record. - APPEND (str): creates or appends data to an existing record. - """ - - INDEX = "index" - CREATE = "create" - DELETE = "delete" - UPDATE = "update" - APPEND = "append" - - -def enforce_query_checks(method): - """Enforce query argument type checking for methods using it.""" - - @functools.wraps(method) - def wrapper(*args, **kwargs): - """Wrap method execution.""" - query = kwargs.pop("query", None) - self_ = args[0] - - return method(*args, query=self_.validate_query(query), **kwargs) - - return wrapper - - -class BaseQuery(BaseModel): - """Base query model.""" - - class Config: - """Base query model configuration.""" - - extra = "forbid" - - query_string: Optional[str] - - -class BaseHTTPBackend(ABC): - """Base HTTP backend interface.""" - - name = "base" - query = BaseQuery - - def validate_query( - self, query: Optional[Union[str, dict, BaseQuery]] = None - ) -> BaseQuery: - """Validate and transforms the query.""" - if query is None: - query = self.query() - - if isinstance(query, str): - query = self.query(query_string=query) - - if isinstance(query, dict): - try: - query = self.query(**query) - except ValidationError as err: - raise BackendParameterException( - "The 'query' argument is expected to be a " - f"{self.query.__name__} instance. {err.errors()}" - ) from err - - if not isinstance(query, self.query): - raise BackendParameterException( - "The 'query' argument is expected to be a " - f"{self.query.__name__} instance." - ) - - logger.debug("Query: %s", str(query)) - - return query - - @abstractmethod - async def list( - self, target: Optional[str] = None, details: bool = False, new: bool = False - ) -> Iterator[Union[str, dict]]: - """List containers in the data backend. E.g., collections, files, indexes.""" - - @abstractmethod - async def status(self) -> HTTPBackendStatus: - """Implement HTTP backend check for server status.""" - - @abstractmethod - @enforce_query_checks - async def read( # noqa: PLR0913 - self, - query: Optional[Union[str, BaseQuery]] = None, - target: Optional[str] = None, - chunk_size: Optional[PositiveInt] = 500, - raw_output: bool = False, - ignore_errors: bool = False, - greedy: bool = False, - max_statements: Optional[PositiveInt] = None, - ) -> Iterator[Union[bytes, dict]]: - """Yield records read from the HTTP response results.""" - - @abstractmethod - async def write( # noqa: PLR0913 - self, - data: Union[List[bytes], List[dict]], - target: Optional[str] = None, - chunk_size: Optional[PositiveInt] = 500, - ignore_errors: bool = False, - operation_type: Optional[OperationType] = None, - simultaneous: bool = False, - max_num_simultaneous: Optional[int] = None, - ) -> int: - """Write statements into the HTTP server given an input endpoint.""" diff --git a/src/ralph/backends/http/lrs.py b/src/ralph/backends/http/lrs.py deleted file mode 100644 index 1049f08fa..000000000 --- a/src/ralph/backends/http/lrs.py +++ /dev/null @@ -1,71 +0,0 @@ -"""LRS HTTP backend for Ralph.""" -import asyncio -from typing import Iterator, Union - -from ralph.backends.http.async_lrs import AsyncLRSHTTPBackend -from ralph.backends.http.base import HTTPBackendStatus - - -def _ensure_running_loop_uniqueness(func): - """Raise an error when methods are used in a running asyncio events loop.""" - - def wrap(*args, **kwargs): - """Wrapper for decorator function.""" - try: - loop = asyncio.get_running_loop() - except RuntimeError: - return func(*args, **kwargs) - if loop.is_running(): - raise RuntimeError( - f"This event loop is already running. You must use " - f"`AsyncLRSHTTPBackend.{func.__name__}` (instead of `LRSHTTPBackend." - f"{func.__name__}`), or run this code outside the current" - " event loop." - ) - return func(*args, **kwargs) - - return wrap - - -class LRSHTTPBackend(AsyncLRSHTTPBackend): - """LRS HTTP backend.""" - - name = "lrs" - - @_ensure_running_loop_uniqueness - def status(self, *args, **kwargs) -> HTTPBackendStatus: - """HTTP backend check for server status.""" - return asyncio.get_event_loop().run_until_complete( - super().status(*args, **kwargs) - ) - - @_ensure_running_loop_uniqueness - def list(self, *args, **kwargs) -> Iterator[Union[str, dict]]: - """Raise error for unsupported `list` method.""" - return asyncio.get_event_loop().run_until_complete( - super().list(*args, **kwargs) - ) - - @_ensure_running_loop_uniqueness - def read(self, *args, **kwargs) -> Iterator[Union[bytes, dict]]: - """Get statements from LRS `target` endpoint. - - See AsyncLRSHTTP.read for more information. - """ - async_statements_gen = super().read(*args, **kwargs) - loop = asyncio.get_event_loop() - try: - while True: - yield loop.run_until_complete(async_statements_gen.__anext__()) - except StopAsyncIteration: - pass - - @_ensure_running_loop_uniqueness - def write(self, *args, **kwargs) -> int: - """Write `data` records to the `target` endpoint and return their count. - - See AsyncLRSHTTP.write for more information. - """ - return asyncio.get_event_loop().run_until_complete( - super().write(*args, **kwargs) - ) diff --git a/src/ralph/backends/loader.py b/src/ralph/backends/loader.py index cda3c2ac6..62e935076 100644 --- a/src/ralph/backends/loader.py +++ b/src/ralph/backends/loader.py @@ -16,7 +16,6 @@ Listable, Writable, ) -from ralph.backends.http.base import BaseHTTPBackend from ralph.backends.lrs.base import BaseAsyncLRSBackend, BaseLRSBackend from ralph.backends.stream.base import BaseStreamBackend @@ -78,13 +77,11 @@ def get_cli_backends() -> Dict[str, Type]: """Return Ralph's backend classes for cli usage.""" dotted_paths = ( "ralph.backends.data", - "ralph.backends.http", "ralph.backends.stream", ) base_backends = ( BaseAsyncDataBackend, BaseDataBackend, - BaseHTTPBackend, BaseStreamBackend, ) return get_backends(dotted_paths, base_backends) @@ -97,7 +94,7 @@ def get_cli_write_backends() -> Dict[str, Type]: return { name: backend for name, backend in backends.items() - if issubclass(backend, (Writable, AsyncWritable, BaseHTTPBackend)) + if issubclass(backend, (Writable, AsyncWritable)) } diff --git a/src/ralph/cli.py b/src/ralph/cli.py index c26e3beef..bd0674c22 100644 --- a/src/ralph/cli.py +++ b/src/ralph/cli.py @@ -4,7 +4,7 @@ import logging import re import sys -from inspect import isasyncgen, isclass, iscoroutinefunction +from inspect import isasyncgen, isclass from pathlib import Path from tempfile import NamedTemporaryFile from typing import Any, Callable, Dict, Optional, Type, Union @@ -29,11 +29,11 @@ from ralph import __version__ as ralph_version from ralph.backends.data.base import ( + AsyncWritable, BaseAsyncDataBackend, BaseDataBackend, BaseOperationType, ) -from ralph.backends.http.base import BaseHTTPBackend from ralph.backends.loader import ( get_cli_backends, get_cli_list_backends, @@ -609,7 +609,7 @@ def convert(from_, to_, ignore_errors, fail_on_unknown, **conversion_set_kwargs) @RalphCLI.lazy_backends_options(get_cli_backends) @click.argument("archive", required=False) @click.option( - "-c", + "-s", "--chunk-size", type=int, default=None, @@ -678,18 +678,6 @@ def read( # noqa: PLR0913 click.echo(statement, nl=False) elif isinstance(backend, BaseStreamBackend): backend.stream(sys.stdout.buffer) - elif isinstance(backend, BaseHTTPBackend): - if query is not None: - query = backend.query(query=query) - for statement in backend.read( - target=target, query=query, chunk_size=chunk_size - ): - click.echo( - bytes( - json.dumps(statement) if isinstance(statement, dict) else statement, - encoding="utf-8", - ) - ) else: msg = "Cannot find an implemented backend type for backend %s" logger.error(msg, backend) @@ -698,19 +686,19 @@ def read( # noqa: PLR0913 @RalphCLI.lazy_backends_options(get_cli_write_backends) @click.option( - "-c", + "-t", + "--target", + type=str, + default=None, + help="The target container to write into", +) +@click.option( + "-s", "--chunk-size", type=int, default=None, help="Get events by chunks of size #", ) -@click.option( - "-f", - "--force", - default=False, - is_flag=True, - help="Overwrite existing archives or records", -) @click.option( "-I", "--ignore-errors", @@ -719,37 +707,26 @@ def read( # noqa: PLR0913 help="Continue writing regardless of raised errors", ) @click.option( - "-s", - "--simultaneous", - default=False, - is_flag=True, - help="With HTTP backend, POST all chunks simultaneously (instead of sequentially)", -) -@click.option( - "-m", - "--max-num-simultaneous", - type=int, - default=-1, - help=( - "The maximum number of chunks to send at once, when using `--simultaneous`. " - "Use `-1` to not set a limit." - ), + "-o", + "--operation-type", + type=click.Choice([op_type.value for op_type in BaseOperationType]), + metavar="OP_TYPE", + required=False, + help="Either index, create, delete, update or append", ) @click.option( - "-t", - "--target", - type=str, - default=None, - help="The target container to write into", + "-c", + "--concurrency", + default=1, + help="Number of chunks to write concurrently. (async backends only)", ) def write( # noqa: PLR0913 backend, + target, chunk_size, - force, ignore_errors, - simultaneous, - max_num_simultaneous, - target, + operation_type, + concurrency, **options, ): """Write an archive to a configured backend.""" @@ -757,40 +734,23 @@ def write( # noqa: PLR0913 logger.debug("Backend parameters: %s", options) - if max_num_simultaneous == 1: - max_num_simultaneous = None - backend_class = get_backend_class(get_cli_write_backends(), backend) backend = get_backend_instance(backend_class, options) - if isinstance(backend, (BaseDataBackend, BaseAsyncDataBackend)): - writer = ( - execute_async(backend.write) - if iscoroutinefunction(backend.write) - else backend.write - ) - writer( - data=sys.stdin.buffer, - target=target, - chunk_size=chunk_size, - ignore_errors=ignore_errors, - operation_type=BaseOperationType.UPDATE - if force - else BaseOperationType.INDEX, - ) - elif isinstance(backend, BaseHTTPBackend): - backend.write( - target=target, - data=sys.stdin.buffer, - chunk_size=chunk_size, - ignore_errors=ignore_errors, - simultaneous=simultaneous, - max_num_simultaneous=max_num_simultaneous, - ) - else: - msg = "Cannot find an implemented backend type for backend %s" - logger.error(msg, backend) - raise UnsupportedBackendException(msg, backend) + writer = backend.write + async_options = {} + if isinstance(backend, AsyncWritable): + writer = execute_async(backend.write) + async_options = {"concurrency": concurrency} + + writer( + data=sys.stdin.buffer, + target=target, + chunk_size=chunk_size, + ignore_errors=ignore_errors, + operation_type=BaseOperationType(operation_type) if operation_type else None, + **async_options, + ) @RalphCLI.lazy_backends_options(get_cli_list_backends, name="list") diff --git a/tests/backends/data/test_async_lrs.py b/tests/backends/data/test_async_lrs.py new file mode 100644 index 000000000..e504b287b --- /dev/null +++ b/tests/backends/data/test_async_lrs.py @@ -0,0 +1,766 @@ +"""Tests for Ralph LRS data backend.""" + +import asyncio +import json +import logging +import re +import time +from functools import partial + +import httpx +import pytest +from httpx import HTTPStatusError, RequestError +from pydantic import AnyHttpUrl, parse_obj_as +from pytest_httpx import HTTPXMock + +from ralph.backends.data.async_lrs import AsyncLRSDataBackend +from ralph.backends.data.base import BaseOperationType, DataBackendStatus +from ralph.backends.data.lrs import LRSDataBackend, LRSDataBackendSettings, LRSHeaders +from ralph.backends.lrs.base import LRSStatementsQuery +from ralph.exceptions import BackendException, BackendParameterException + +from ...helpers import mock_statement + + +def test_backends_data_async_lrs_default_instantiation(monkeypatch, fs, lrs_backend): + """Test the `LRSDataBackend` default instantiation.""" + fs.create_file(".env") + backend_settings_names = [ + "BASE_URL", + "USERNAME", + "PASSWORD", + "HEADERS", + "LOCALE_ENCODING", + "READ_CHUNK_SIZE", + "STATUS_ENDPOINT", + "STATEMENTS_ENDPOINT", + "WRITE_CHUNK_SIZE", + ] + for name in backend_settings_names: + monkeypatch.delenv(f"RALPH_BACKENDS__DATA__LRS__{name}", raising=False) + + assert AsyncLRSDataBackend.name == "async_lrs" + assert LRSDataBackend.name == "lrs" + backend_class = lrs_backend().__class__ + assert backend_class.settings_class == LRSDataBackendSettings + backend = backend_class() + assert backend.query_class == LRSStatementsQuery + assert backend.base_url == parse_obj_as(AnyHttpUrl, "http://0.0.0.0:8100") + assert backend.auth == ("ralph", "secret") + assert backend.settings.HEADERS == LRSHeaders() + assert backend.settings.LOCALE_ENCODING == "utf8" + assert backend.settings.READ_CHUNK_SIZE == 500 + assert backend.settings.STATUS_ENDPOINT == "/__heartbeat__" + assert backend.settings.STATEMENTS_ENDPOINT == "/xAPI/statements" + assert backend.settings.WRITE_CHUNK_SIZE == 500 + + # Test overriding default values with environment variables. + monkeypatch.setenv("RALPH_BACKENDS__DATA__LRS__USERNAME", "foo") + backend = backend_class() + assert backend.auth == ("foo", "secret") + + +def test_backends_data_async_lrs_instantiation_with_settings(lrs_backend): + """Test the LRS backend instantiation with settings.""" + + headers = LRSHeaders( + X_EXPERIENCE_API_VERSION="1.0.3", CONTENT_TYPE="application/json" + ) + settings = LRSDataBackendSettings( + BASE_URL="http://fake-lrs.com", + USERNAME="user", + PASSWORD="pass", + HEADERS=headers, + LOCALE_ENCODING="utf-16", + STATUS_ENDPOINT="/fake-status-endpoint", + STATEMENTS_ENDPOINT="/xAPI/statements", + READ_CHUNK_SIZE=5000, + WRITE_CHUNK_SIZE=5000, + ) + + assert AsyncLRSDataBackend.settings_class == LRSDataBackendSettings + backend = AsyncLRSDataBackend(settings) + assert backend.query_class == LRSStatementsQuery + assert isinstance(backend.base_url, AnyHttpUrl) + assert backend.auth == ("user", "pass") + assert backend.settings.HEADERS.CONTENT_TYPE == "application/json" + assert backend.settings.HEADERS.X_EXPERIENCE_API_VERSION == "1.0.3" + assert backend.settings.LOCALE_ENCODING == "utf-16" + assert backend.settings.STATUS_ENDPOINT == "/fake-status-endpoint" + assert backend.settings.STATEMENTS_ENDPOINT == "/xAPI/statements" + assert backend.settings.READ_CHUNK_SIZE == 5000 + assert backend.settings.WRITE_CHUNK_SIZE == 5000 + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_status_with_successful_request( + httpx_mock: HTTPXMock, lrs_backend +): + """Test the LRS backend status method returns `OK` when the request is + successful. + """ + backend: AsyncLRSDataBackend = lrs_backend() + # Mock GET response of HTTPX + url = "http://fake-lrs.com/__heartbeat__" + httpx_mock.add_response(url=url, method="GET", status_code=200) + status = await backend.status() + assert status == DataBackendStatus.OK + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_status_with_request_error( + httpx_mock: HTTPXMock, lrs_backend, caplog +): + """Test the LRS backend status method returns `AWAY` when a `RequestError` + exception is caught. + """ + backend: AsyncLRSDataBackend = lrs_backend() + httpx_mock.add_exception(RequestError("Test Request Error")) + with caplog.at_level(logging.ERROR): + status = await backend.status() + assert ( + f"ralph.backends.data.{backend.name}", + logging.ERROR, + "Unable to request the server", + ) in caplog.record_tuples + + assert status == DataBackendStatus.AWAY + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_status_with_http_status_error( + httpx_mock: HTTPXMock, lrs_backend, caplog +): + """Test the LRS backend status method returns `ERROR` when an `HTTPStatusError` + is caught. + """ + backend: AsyncLRSDataBackend = lrs_backend() + exception = HTTPStatusError("Test HTTP Status Error", request=None, response=None) + httpx_mock.add_exception(exception) + with caplog.at_level(logging.ERROR): + status = await backend.status() + assert ( + f"ralph.backends.data.{backend.name}", + logging.ERROR, + "Response raised an HTTP status of 4xx or 5xx", + ) in caplog.record_tuples + + assert status == DataBackendStatus.ERROR + await backend.close() + + +@pytest.mark.parametrize("max_statements", [None, 2, 4, 8]) +@pytest.mark.anyio +async def test_backends_data_async_lrs_read_max_statements( + httpx_mock: HTTPXMock, max_statements: int, lrs_backend +): + """Test the LRS backend `read` method `max_statements` argument.""" + statements = [mock_statement() for _ in range(3)] + response = {"statements": statements, "more": "/xAPI/statements/?pit_id=pit_id"} + more_response = {"statements": statements} + all_statements = statements * 2 + + # Mock GET response of HTTPX for target and "more" target without query parameter + url = "http://fake-lrs.com/xAPI/statements/?limit=500" + httpx_mock.add_response( + url=url, + method="GET", + json=response, + ) + # Given max_statements equal to two - already three statements were retrieved in the + # first request, thus no more requests are expected. + if not max_statements == 2: + url = "http://fake-lrs.com/xAPI/statements/?limit=500&pit_id=pit_id" + httpx_mock.add_response(url=url, method="GET", json=more_response) + + backend: AsyncLRSDataBackend = lrs_backend() + result = [x async for x in backend.read(max_statements=max_statements)] + # Assert that result is of the proper length + assert result == all_statements[:max_statements] + await backend.close() + + +@pytest.mark.anyio +@pytest.mark.parametrize("prefetch", [1, 10]) +async def test_backends_data_async_lrs_read_without_target( + prefetch: int, httpx_mock: HTTPXMock, lrs_backend +): + """Test that the LRS backend `read` method without target parameter value fetches + statements from '/xAPI/statements/' default endpoint. + """ + backend: AsyncLRSDataBackend = lrs_backend() + response = {"statements": mock_statement()} + url = "http://fake-lrs.com/xAPI/statements/?limit=500" + httpx_mock.add_response(url=url, method="GET", json=response) + result = [x async for x in backend.read(prefetch=prefetch)] + assert result == [response["statements"]] + await backend.close() + + +@pytest.mark.anyio +@pytest.mark.parametrize("prefetch", [1, 10]) +async def test_backends_data_async_lrs_read_backend_error( + httpx_mock: HTTPXMock, caplog, prefetch: int, lrs_backend +): + """Test the LRS backend `read` method raises a `BackendException` when the server + returns an error. + """ + backend: AsyncLRSDataBackend = lrs_backend() + url = "http://fake-lrs.com/xAPI/statements/?limit=500" + httpx_mock.add_response(url=url, method="GET", status_code=500) + error = ( + "Failed to fetch statements: Server error '500 Internal Server Error' for url " + "'http://fake-lrs.com/xAPI/statements/?limit=500'\nFor more information check: " + "https://httpstatuses.com/500" + ) + with pytest.raises(BackendException, match=re.escape(error)): + with caplog.at_level(logging.ERROR): + _ = [x async for x in backend.read(prefetch=prefetch)] + + assert ( + f"ralph.backends.data.{backend.name}", + logging.ERROR, + error, + ) in caplog.record_tuples + await backend.close() + + +@pytest.mark.anyio +@pytest.mark.parametrize("prefetch", [1, 10]) +async def test_backends_data_async_lrs_read_without_pagination( + httpx_mock: HTTPXMock, prefetch: int, lrs_backend +): + """Test the LRS backend `read` method when the request on the target endpoint + returns statements without pagination. + """ + backend: AsyncLRSDataBackend = lrs_backend() + statements = [ + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/paused"}), + ] + response = {"statements": statements} + + # Mock GET response of HTTPX without query parameter + url = "http://fake-lrs.com/xAPI/statements/?limit=500" + httpx_mock.add_response(url=url, method="GET", json=response) + + # Return an iterable of dict + result = [x async for x in backend.read(raw_output=False, prefetch=prefetch)] + assert result == statements + + # Return an iterable of bytes + result = [x async for x in backend.read(raw_output=True, prefetch=prefetch)] + assert result == [ + f"{json.dumps(statement)}\n".encode("utf-8") for statement in statements + ] + await backend.close() + + +@pytest.mark.anyio +@pytest.mark.parametrize("prefetch", [1, 10]) +async def test_backends_data_async_lrs_read_without_pagination_with_query( + httpx_mock: HTTPXMock, prefetch: int, lrs_backend +): + """Test the LRS backend `read` method with a query when the request on the target + endpoint returns statements without pagination. + """ + backend: AsyncLRSDataBackend = lrs_backend() + verb_id = "https://w3id.org/xapi/video/verbs/played" + query = LRSStatementsQuery(verb=verb_id) + statements = [ + mock_statement(verb={"id": verb_id}), + mock_statement(verb={"id": verb_id}), + ] + response = {"statements": statements} + # Mock GET response of HTTPX with query parameter + url = f"http://fake-lrs.com/xAPI/statements/?limit=500&verb={verb_id}" + httpx_mock.add_response(url=url, method="GET", json=response) + # Return an iterable of dict + reader = backend.read(query=query, raw_output=False, prefetch=prefetch) + result = [x async for x in reader] + assert result == statements + + # Return an iterable of bytes + reader = backend.read(query=query, raw_output=True, prefetch=prefetch) + result = [x async for x in reader] + assert result == [ + f"{json.dumps(statement)}\n".encode("utf-8") for statement in statements + ] + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_read_with_pagination( + httpx_mock: HTTPXMock, lrs_backend +): + """Test the LRS backend `read` method when the request on the target endpoint + returns statements with pagination. + """ + backend: AsyncLRSDataBackend = lrs_backend() + statements = [ + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/initialized"}), + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/paused"}), + ] + more_statements = [ + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/seeked"}), + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), + mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/paused"}), + ] + all_statements = statements + more_statements + response = {"statements": statements, "more": "/xAPI/statements/?pit_id=pit_id"} + more_response = {"statements": more_statements} + + url = "http://fake-lrs.com/xAPI/statements/?limit=500" + httpx_mock.add_response(url=url, method="GET", json=response) + url = "http://fake-lrs.com/xAPI/statements/?limit=500&pit_id=pit_id" + httpx_mock.add_response(url=url, method="GET", json=more_response) + + # Return an iterable of dict + result = [x async for x in backend.read(raw_output=False)] + assert result == all_statements + + # Return an iterable of bytes + result = [x async for x in backend.read(raw_output=True)] + assert result == [ + f"{json.dumps(statement)}\n".encode("utf-8") for statement in all_statements + ] + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_read_with_pagination_with_query( + httpx_mock: HTTPXMock, lrs_backend +): + """Test the LRS backend `read` method with a query when the request on the target + endpoint returns statements with pagination. + """ + backend: AsyncLRSDataBackend = lrs_backend() + verb_id = "https://w3id.org/xapi/video/verbs/played" + query = LRSStatementsQuery(verb=verb_id) + statements = [mock_statement(verb={"id": verb_id})] + more_statements = [ + mock_statement(verb={"id": verb_id}), + ] + all_statements = statements + more_statements + response = {"statements": statements, "more": "/xAPI/statements/?pit_id=pit_id"} + more_response = {"statements": more_statements} + + # Mock GET response of HTTPX with query parameter + url = f"http://fake-lrs.com/xAPI/statements/?limit=500&verb={verb_id}" + httpx_mock.add_response(url=url, method="GET", json=response) + url = f"http://fake-lrs.com/xAPI/statements/?limit=500&verb={verb_id}&pit_id=pit_id" + httpx_mock.add_response(url=url, method="GET", json=more_response) + + # Return an iterable of dict + reader = backend.read(query=query, raw_output=False) + result = [x async for x in reader] + assert result == all_statements + + # Return an iterable of bytes + reader = backend.read(query=query, raw_output=True) + result = [x async for x in reader] + assert result == [ + f"{json.dumps(statement)}\n".encode("utf-8") for statement in all_statements + ] + await backend.close() + + +@pytest.mark.anyio +@pytest.mark.parametrize( + "chunk_size,concurrency,statement_count_logs", + [ + (500, 1, [6]), + (2, 1, [6]), + (500, 2, [6]), + (2, 2, [2, 2, 2]), + (1, 2, [1, 1, 1, 1, 1, 1]), + ], +) +async def test_backends_data_async_lrs_write_without_operation( # noqa: PLR0913 + httpx_mock: HTTPXMock, + lrs_backend, + caplog, + chunk_size, + concurrency, + statement_count_logs, +): + """Test the LRS backend `write` method, given no operation_type should POST to + the LRS server. + """ + backend: AsyncLRSDataBackend = lrs_backend() + statements = [mock_statement() for _ in range(6)] + + # Mock HTTPX POST + url = "http://fake-lrs.com/xAPI/statements/" + httpx_mock.add_response(url=url, method="POST", json=statements) + + with caplog.at_level(logging.DEBUG): + assert await backend.write( + data=statements, + chunk_size=chunk_size, + concurrency=concurrency, + ) == len(statements) + + # If no chunk_size is provided, a default value (500) should be used. + if chunk_size is None: + chunk_size = 500 + + assert ( + f"ralph.backends.data.{backend.name}", + logging.DEBUG, + "Start writing to the http://fake-lrs.com/xAPI/statements/ endpoint " + f"(chunk size: {chunk_size})", + ) in caplog.record_tuples + + if isinstance(backend, AsyncLRSDataBackend): + # Only async backends support `concurrency`. + log_records = list(caplog.record_tuples) + for statement_count_log in statement_count_logs: + log_records.remove( + ( + f"ralph.backends.data.{backend.name}", + logging.DEBUG, + f"Posted {statement_count_log} statements", + ) + ) + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_write_without_data(caplog, lrs_backend): + """Test the LRS backend `write` method returns null when no data to write + in the target endpoint are given. + """ + backend: AsyncLRSDataBackend = lrs_backend() + with caplog.at_level(logging.INFO): + result = await backend.write([]) + + assert ( + f"ralph.backends.data.{backend.name}", + logging.INFO, + "Data Iterator is empty; skipping write to target", + ) in caplog.record_tuples + + assert result == 0 + await backend.close() + + +@pytest.mark.parametrize( + "operation_type,error_msg", + [ + (BaseOperationType.APPEND, "Append operation_type is not allowed"), + (BaseOperationType.UPDATE, "Update operation_type is not allowed"), + (BaseOperationType.DELETE, "Delete operation_type is not allowed"), + ], +) +@pytest.mark.anyio +async def test_backends_data_async_lrs_write_with_unsupported_operation( + lrs_backend, operation_type, caplog, error_msg +): + """Test the LRS backend `write` method, given an unsupported` `operation_type`, + should raise a `BackendParameterException`. + """ + backend: AsyncLRSDataBackend = lrs_backend() + with pytest.raises(BackendParameterException, match=error_msg): + with caplog.at_level(logging.ERROR): + await backend.write(data=[b"foo"], operation_type=operation_type) + + assert ( + f"ralph.backends.data.{backend.name}", + logging.ERROR, + error_msg, + ) in caplog.record_tuples + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_write_with_invalid_parameters( + httpx_mock: HTTPXMock, lrs_backend, caplog +): + """Test the LRS backend `write` method, given invalid_parameters + should raise a `BackendParameterException`. + """ + backend: AsyncLRSDataBackend = lrs_backend() + if not isinstance(backend, AsyncLRSDataBackend): + # Only async backends support `concurrency`. + await backend.close() + return + + error = "concurrency must be a strictly positive integer" + with pytest.raises(BackendParameterException, match=error): + with caplog.at_level(logging.ERROR): + await backend.write(data=[b"foo"], concurrency=-1) + + assert ( + f"ralph.backends.data.{backend.name}", + logging.ERROR, + error, + ) in caplog.record_tuples + + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_write_with_target( + httpx_mock: HTTPXMock, lrs_backend, caplog +): + """Test the LRS backend `write` method with a target parameter value writes + statements to the target endpoint. + """ + backend: AsyncLRSDataBackend = lrs_backend() + data = [mock_statement() for _ in range(3)] + + # Mock HTTPX POST + url = "http://fake-lrs.com/not-xAPI/not-statements/" + httpx_mock.add_response( + url=url, + method="POST", + json=data, + ) + + with caplog.at_level(logging.DEBUG): + assert await backend.write(data=data, target="/not-xAPI/not-statements/") == 3 + + assert ( + f"ralph.backends.data.{backend.name}", + logging.DEBUG, + f"Start writing to the {url} endpoint (chunk size: 500)", + ) in caplog.record_tuples + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_write_with_target_and_binary_data( + httpx_mock: HTTPXMock, lrs_backend, caplog +): + """Test the LRS backend `write` method with a target parameter value given + binary statements, writes them to the target endpoint. + """ + backend: AsyncLRSDataBackend = lrs_backend() + data = [mock_statement() for _ in range(3)] + bytes_data = [json.dumps(d).encode("utf-8") for d in data] + + # Mock HTTPX POST + url = "http://fake-lrs.com/not-xAPI/not-statements/" + httpx_mock.add_response( + url=url, + method="POST", + json=data, + ) + + with caplog.at_level(logging.DEBUG): + assert ( + await backend.write(data=bytes_data, target="/not-xAPI/not-statements/") + == 3 + ) + + assert ( + f"ralph.backends.data.{backend.name}", + logging.DEBUG, + f"Start writing to the {url} endpoint (chunk size: 500)", + ) in caplog.record_tuples + await backend.close() + + +@pytest.mark.anyio +@pytest.mark.parametrize( + "operation_type", + [BaseOperationType.CREATE, BaseOperationType.INDEX], +) +async def test_backends_data_async_lrs_write_with_create_or_index_operation( + operation_type, httpx_mock: HTTPXMock, lrs_backend, caplog +): + """Test the `LRSHTTP.write` method with `CREATE` or `INDEX` operation_type writes + statements to the given target endpoint. + """ + backend: AsyncLRSDataBackend = lrs_backend() + data = [mock_statement() for _ in range(3)] + + # Mock HTTPX POST + url = "http://fake-lrs.com/xAPI/statements/" + httpx_mock.add_response(url=url, method="POST", json=data) + + with caplog.at_level(logging.DEBUG): + assert await backend.write(data=data, operation_type=operation_type) == 3 + + assert ( + f"ralph.backends.data.{backend.name}", + logging.DEBUG, + "Posted 3 statements", + ) in caplog.record_tuples + await backend.close() + + +@pytest.mark.anyio +async def test_backends_data_async_lrs_write_with_post_exception( + httpx_mock: HTTPXMock, + lrs_backend, + caplog, +): + """Test the `LRSHTTP.write` method with HTTP error.""" + backend: AsyncLRSDataBackend = lrs_backend() + data = [mock_statement()] + + # Mock HTTPX POST + url = "http://fake-lrs.com/xAPI/statements/" + httpx_mock.add_response(url=url, method="POST", json=data, status_code=500) + with pytest.raises(BackendException): + with caplog.at_level(logging.ERROR): + await backend.write(data=data) + + msg = ( + "Failed to post statements: Server error '500 Internal Server Error' for url " + "'http://fake-lrs.com/xAPI/statements/'\nFor more information check: " + "https://httpstatuses.com/500" + ) + assert ( + f"ralph.backends.data.{backend.name}", + logging.ERROR, + msg, + ) in caplog.record_tuples + + # Given `ignore_errors=True` the `write` method should log a warning message. + with caplog.at_level(logging.WARNING): + assert not (await backend.write(data=data, ignore_errors=True)) + + assert ( + f"ralph.backends.data.{backend.name}", + logging.WARNING, + msg, + ) in caplog.record_tuples + await backend.close() + + +# Asynchronicity tests for dev purposes (skip in CI) + + +@pytest.mark.skip(reason="Timing based tests are too unstable to run in CI.") +@pytest.mark.anyio +@pytest.mark.parametrize( + "num_pages,chunk_size,network_latency_time", [(3, 3, 0.2), (10, 3, 0.2)] +) +async def test_backends_data_async_lrs_read_concurrency( + httpx_mock: HTTPXMock, num_pages, chunk_size, network_latency_time, lrs_backend +): + """Test concurrency performances in `read`, for development use. + + Args: + num_pages: the number of pages to generate + chunk_size: the number of results per page + network_latency_time: the wait duration before GET results + + NB: Maximal gains are (num_pages-1)*fetch_time; when batch_processing_time > + fetch_time + """ + backend: AsyncLRSDataBackend = lrs_backend() + if not isinstance(backend, AsyncLRSDataBackend): + # Only async backends support `concurrency`. + await backend.close() + return + + async def _simulate_network_latency(request: httpx.Request, response): + """Return requested statements after an async sleep time.""" + + await asyncio.sleep(network_latency_time) + return httpx.Response(status_code=200, json=response) + + async def _simulate_slow_processing(): + """Async sleep for a fixed amount of time.""" + # Time per chunk = API response time to maximize saved time + processing_time = network_latency_time / chunk_size + await asyncio.sleep(processing_time) + + # Generate fake targets + targets = {0: "/xAPI/statements/"} + for index in range(1, num_pages): + targets[index] = f"/xAPI/statements/?pit_id=fake-pit-{index}" + + # Generate fake statements + all_statements = {} + for index in range(num_pages): + all_statements[index] = { + "statements": [mock_statement() for _ in range(chunk_size)] + } + if index < num_pages - 1: + all_statements[index]["more"] = targets[index + 1] + + # Mock HTTPX GET + for index in range(num_pages): + pit_id = f"&pit_id=fake-pit-{index}" + if index == 0: + pit_id = "" + url = f"http://fake-lrs.com/xAPI/statements/?limit={chunk_size}{pit_id}" + statements = all_statements[index] + httpx_mock.add_callback( + partial(_simulate_network_latency, response=statements), + url=url, + method="GET", + ) + + # Check that read with `prefetch` is faster than without when processing is slow + time_1 = time.time() + async for _ in backend.read(target=targets[0], chunk_size=chunk_size): + await _simulate_slow_processing() + without_prefetch_duration = time.time() - time_1 + + time_2 = time.time() + async for _ in backend.read(target=targets[0], chunk_size=chunk_size, prefetch=100): + await _simulate_slow_processing() + prefetch_duration = time.time() - time_2 + + # Assert gains are close enough to theoretical gains + proximity_ratio = 0.9 + assert ( + without_prefetch_duration + > prefetch_duration + proximity_ratio * (num_pages - 1) * network_latency_time + ) + + +@pytest.mark.skip(reason="Timing based tests are too unstable to run in CI") +@pytest.mark.anyio +async def test_backends_data_async_lrs_write_concurrency( + httpx_mock: HTTPXMock, lrs_backend +): + """Test concurrency performances in `write`, for development use.""" + backend: AsyncLRSDataBackend = lrs_backend() + if not isinstance(backend, AsyncLRSDataBackend): + # Only async backends support `concurrency`. + await backend.close() + return + + data = [mock_statement() for _ in range(6)] + + # Changing data length might break tests + assert len(data) == 6 + + # Mock HTTPX POST + async def simulate_network_latency(_): + await asyncio.sleep(0.5) + return httpx.Response(status_code=200) + + httpx_mock.add_callback(simulate_network_latency) + + # Check that concurrent write is faster than non-concurrent + time_1 = time.time() + await backend.write(data, chunk_size=2, concurrency=1) + non_concurrent_duration = time.time() - time_1 + + time_2 = time.time() + await backend.write(data=data, chunk_size=2, concurrency=3) + concurrent_duration = time.time() - time_2 + + # Server side processing time should be 3 times faster + assert non_concurrent_duration > 2.1 * concurrent_duration + + # Check that write with `concurrency`=2 functions as expected + time_1 = time.time() + await backend.write(data=data, chunk_size=1, concurrency=2) + limited_concurrency_duration = time.time() - time_1 + + # Server side processing time should be 3 times faster with unlimited + assert limited_concurrency_duration > 2.1 * concurrent_duration + assert limited_concurrency_duration <= 3.1 * concurrent_duration diff --git a/tests/backends/data/test_base.py b/tests/backends/data/test_base.py index e7f8f355a..0b5c76a55 100644 --- a/tests/backends/data/test_base.py +++ b/tests/backends/data/test_base.py @@ -29,7 +29,7 @@ ], ) def test_backends_data_base_validate_backend_query_with_valid_input(value, expected): - """Test the enforce_query_checks function given valid input.""" + """Test the validate_backend_query function given valid input.""" class MockBaseDataBackend(BaseDataBackend[BaseDataBackendSettings, BaseQuery]): """A class mocking the base data backend class.""" @@ -62,10 +62,10 @@ def close(self): ), ], ) -def test_backends_data_base_enforce_query_checks_with_invalid_input( +def test_backends_data_base_validate_backend_query_with_invalid_input( value, error, caplog ): - """Test the enforce_query_checks function given invalid input.""" + """Test the validate_backend_query function given invalid input.""" class MockBaseDataBackend(BaseDataBackend[BaseDataBackendSettings, BaseQuery]): """A class mocking the base database class.""" diff --git a/tests/backends/http/__init__.py b/tests/backends/http/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/backends/http/test_async_lrs.py b/tests/backends/http/test_async_lrs.py deleted file mode 100644 index 9c6b6df2f..000000000 --- a/tests/backends/http/test_async_lrs.py +++ /dev/null @@ -1,1083 +0,0 @@ -"""Tests for Ralph LRS HTTP backend.""" - -import asyncio -import json -import logging -import time -from functools import partial -from urllib.parse import ParseResult, parse_qsl, urlencode, urljoin, urlparse - -import httpx -import pytest -from httpx import HTTPStatusError, RequestError -from pydantic import AnyHttpUrl, parse_obj_as -from pytest_httpx import HTTPXMock - -from ralph.backends.http.async_lrs import ( - AsyncLRSHTTPBackend, - LRSHeaders, - LRSHTTPBackendSettings, - OperationType, -) -from ralph.backends.http.base import HTTPBackendStatus -from ralph.backends.lrs.base import LRSStatementsQuery -from ralph.exceptions import BackendException, BackendParameterException - -from ...helpers import mock_statement - - -async def _unpack_async_generator(async_gen): - """Unpack content of async generator into list.""" - result = [] - async for value in async_gen: - result.append(value) - return result - - -def test_backends_http_async_lrs_default_instantiation(monkeypatch, fs): - """Test the `LRSHTTPBackend` default instantiation.""" - fs.create_file(".env") - backend_settings_names = [ - "BASE_URL", - "USERNAME", - "PASSWORD", - "HEADERS", - "STATUS_ENDPOINT", - "STATEMENTS_ENDPOINT", - ] - for name in backend_settings_names: - monkeypatch.delenv(f"RALPH_BACKENDS__HTTP__LRS__{name}", raising=False) - - assert AsyncLRSHTTPBackend.name == "async_lrs" - assert AsyncLRSHTTPBackend.settings_class == LRSHTTPBackendSettings - backend = AsyncLRSHTTPBackend() - assert backend.query == LRSStatementsQuery - assert backend.base_url == parse_obj_as(AnyHttpUrl, "http://0.0.0.0:8100") - assert backend.auth == ("ralph", "secret") - assert backend.settings.HEADERS == LRSHeaders() - assert backend.settings.STATUS_ENDPOINT == "/__heartbeat__" - assert backend.settings.STATEMENTS_ENDPOINT == "/xAPI/statements" - - # Test overriding default values with environment variables. - monkeypatch.setenv("RALPH_BACKENDS__HTTP__LRS__USERNAME", "foo") - backend = AsyncLRSHTTPBackend() - assert backend.auth == ("foo", "secret") - - -def test_backends_http_async_lrs_instantiation(): - """Test the LRS backend default instantiation.""" - - headers = LRSHeaders( - X_EXPERIENCE_API_VERSION="1.0.3", CONTENT_TYPE="application/json" - ) - settings = LRSHTTPBackendSettings( - BASE_URL="http://fake-lrs.com", - USERNAME="user", - PASSWORD="pass", - HEADERS=headers, - STATUS_ENDPOINT="/fake-status-endpoint", - STATEMENTS_ENDPOINT="/xAPI/statements", - ) - - assert AsyncLRSHTTPBackend.name == "async_lrs" - assert AsyncLRSHTTPBackend.settings_class == LRSHTTPBackendSettings - backend = AsyncLRSHTTPBackend(settings) - assert backend.query == LRSStatementsQuery - assert isinstance(backend.base_url, AnyHttpUrl) - assert backend.auth == ("user", "pass") - assert backend.settings.HEADERS.CONTENT_TYPE == "application/json" - assert backend.settings.HEADERS.X_EXPERIENCE_API_VERSION == "1.0.3" - assert backend.settings.STATUS_ENDPOINT == "/fake-status-endpoint" - assert backend.settings.STATEMENTS_ENDPOINT == "/xAPI/statements" - - -@pytest.mark.anyio -async def test_backends_http_async_lrs_status_with_successful_request( - httpx_mock: HTTPXMock, -): - """Test the LRS backend status method returns `OK` when the request is - successful. - """ - base_url = "http://fake-lrs.com" - status_endpoint = "/__heartbeat__" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - STATUS_ENDPOINT=status_endpoint, - ) - backend = AsyncLRSHTTPBackend(settings) - - # Mock GET response of HTTPX - httpx_mock.add_response( - url=urljoin(base_url, status_endpoint), method="GET", status_code=200 - ) - - status = await backend.status() - assert status == HTTPBackendStatus.OK - - -@pytest.mark.anyio -async def test_backends_http_async_lrs_status_with_request_error( - httpx_mock: HTTPXMock, caplog -): - """Test the LRS backend status method returns `AWAY` when a `RequestError` - exception is caught. - """ - - base_url = "http://fake-lrs.com" - status_endpoint = "/__heartbeat__" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - STATUS_ENDPOINT=status_endpoint, - ) - backend = AsyncLRSHTTPBackend(settings) - - httpx_mock.add_exception(RequestError("Test Request Error")) - - with caplog.at_level(logging.ERROR): - status = await backend.status() - assert ( - "ralph.backends.http.async_lrs", - logging.ERROR, - "Unable to request the server", - ) in caplog.record_tuples - - assert status == HTTPBackendStatus.AWAY - - -@pytest.mark.anyio -async def test_backends_http_async_lrs_status_with_http_status_error( - httpx_mock: HTTPXMock, caplog -): - """Test the LRS backend status method returns `ERROR` when an `HTTPStatusError` - is caught. - """ - - base_url = "http://fake-lrs.com" - status_endpoint = "/__heartbeat__" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - STATUS_ENDPOINT=status_endpoint, - ) - backend = AsyncLRSHTTPBackend(settings) - - httpx_mock.add_exception( - HTTPStatusError("Test HTTP Status Error", request=None, response=None) - ) - - with caplog.at_level(logging.ERROR): - status = await backend.status() - - assert ( - "ralph.backends.http.async_lrs", - logging.ERROR, - "Response raised an HTTP status of 4xx or 5xx", - ) in caplog.record_tuples - assert status == HTTPBackendStatus.ERROR - - -@pytest.mark.anyio -async def test_backends_http_async_lrs_list(caplog): - """Test the LRS backend `list` method raises a `NotImplementedError`.""" - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - msg = ( - "LRS HTTP backend does not support `list` method, " - "cannot list from /xAPI/statements/" - ) - with pytest.raises(NotImplementedError, match=msg): - with caplog.at_level(logging.ERROR): - await backend.list(target=target) - - assert ( - "ralph.backends.http.async_lrs", - logging.ERROR, - ( - "LRS HTTP backend does not support `list` method, cannot list" - f" from {target}" - ), - ) in caplog.record_tuples - - -@pytest.mark.parametrize("max_statements", [None, 2, 4, 8]) -@pytest.mark.anyio -async def test_backends_http_async_lrs_read_max_statements( - httpx_mock: HTTPXMock, max_statements: int -): - """Test the LRS backend `read` method `max_statements` property.""" - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - more_target = "/xAPI/statements/?pit_id=fake-pit-id" - - chunk_size = 3 - - statements = { - "statements": [mock_statement() for _ in range(chunk_size)], - "more": more_target, - } - more_statements = { - "statements": [mock_statement() for _ in range(chunk_size)], - } - - # Mock GET response of HTTPX for target and "more" target without query parameter - default_params = LRSStatementsQuery(limit=chunk_size).dict( - exclude_none=True, exclude_unset=True - ) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=target, - query=urlencode(default_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=statements, - ) - - default_params.update(dict(parse_qsl(urlparse(more_target).query))) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=urlparse(more_target).path, - query=urlencode(default_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=more_statements, - ) - - settings = AsyncLRSHTTPBackend.settings_class( - BASE_URL=base_url, USERNAME="user", PASSWORD="pass" - ) - backend = AsyncLRSHTTPBackend(settings) - - # Return an iterable of dict - result = await _unpack_async_generator( - backend.read( - target=target, max_statements=max_statements, chunk_size=chunk_size - ) - ) - all_statements = statements["statements"] + more_statements["statements"] - assert len(all_statements) == 6 - - # Assert that result is of the proper length - if max_statements is None: - assert result == all_statements - else: - assert result == all_statements[:max_statements] - - -@pytest.mark.parametrize("greedy", [False, True]) -@pytest.mark.anyio -async def test_backends_http_async_lrs_read_without_target( - httpx_mock: HTTPXMock, greedy: bool -): - """Test that the LRS backend `read` method without target parameter value fetches - statements from '/xAPI/statements' default endpoint. - """ - - base_url = "http://fake-lrs.com" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - statements = {"statements": [mock_statement() for _ in range(3)]} - - # Mock HTTPX GET - default_params = LRSStatementsQuery(limit=500).dict( - exclude_none=True, exclude_unset=True - ) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=backend.settings.STATEMENTS_ENDPOINT, - query=urlencode(default_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=statements, - ) - - result = await _unpack_async_generator(backend.read(greedy=greedy)) - assert result == statements["statements"] - assert len(result) == 3 - - -@pytest.mark.anyio -@pytest.mark.parametrize("greedy", [False, True]) -async def test_backends_http_async_lrs_read_backend_error( - httpx_mock: HTTPXMock, caplog, greedy: bool -): - """Test the LRS backend `read` method raises a `BackendException` when the server - returns an error. - """ - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - # Mock GET response of HTTPX - default_params = LRSStatementsQuery(limit=500).dict( - exclude_none=True, exclude_unset=True - ) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=target, - query=urlencode(default_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - status_code=500, - ) - - with pytest.raises(BackendException, match="Failed to fetch statements."): - with caplog.at_level(logging.ERROR): - await _unpack_async_generator(backend.read(target=target, greedy=greedy)) - - assert ( - "ralph.backends.http.async_lrs", - logging.ERROR, - "Failed to fetch statements.", - ) in caplog.record_tuples - - -@pytest.mark.anyio -@pytest.mark.parametrize("greedy", [False, True]) -async def test_backends_http_async_lrs_read_without_pagination( - httpx_mock: HTTPXMock, greedy: bool -): - """Test the LRS backend `read` method when the request on the target endpoint - returns statements without pagination.""" - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - statements = { - "statements": [ - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/paused"}), - ] - } - - # Mock GET response of HTTPX without query parameter - default_params = LRSStatementsQuery(limit=500).dict( - exclude_none=True, exclude_unset=True - ) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=target, - query=urlencode(default_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=statements, - ) - - # Return an iterable of dict - result = await _unpack_async_generator( - backend.read(target=target, raw_output=False, greedy=greedy) - ) - assert result == statements["statements"] - assert len(result) == 3 - - # Return an iterable of bytes - result = await _unpack_async_generator( - backend.read(target=target, raw_output=True, greedy=greedy) - ) - assert result == [ - bytes(json.dumps(statement), encoding="utf-8") - for statement in statements["statements"] - ] - assert len(result) == 3 - - # Mock GET response of HTTPX with query parameter - query = LRSStatementsQuery(verb="https://w3id.org/xapi/video/verbs/played") - - statements_with_query_played_verb = { - "statements": [ - raw for raw in statements["statements"] if "played" in raw["verb"]["id"] - ] - } - query_params = query.dict(exclude_none=True, exclude_unset=True) - query_params.update({"limit": 500}) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=target, - query=urlencode(query_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=statements_with_query_played_verb, - ) - # Return an iterable of dict - result = await _unpack_async_generator( - backend.read(query=query, target=target, raw_output=False, greedy=greedy) - ) - assert result == statements_with_query_played_verb["statements"] - assert len(result) == 2 - - # Return an iterable of bytes - result = await _unpack_async_generator( - backend.read(query=query, target=target, raw_output=True, greedy=greedy) - ) - assert result == [ - bytes(json.dumps(statement), encoding="utf-8") - for statement in statements_with_query_played_verb["statements"] - ] - assert len(result) == 2 - - -@pytest.mark.anyio -async def test_backends_http_async_lrs_read_with_pagination(httpx_mock: HTTPXMock): - """Test the LRS backend `read` method when the request on the target endpoint - returns statements with pagination.""" - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - more_target = "/xAPI/statements/?pit_id=fake-pit-id" - statements = { - "statements": [ - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), - mock_statement( - verb={"id": "https://w3id.org/xapi/video/verbs/initialized"} - ), - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/paused"}), - ], - "more": more_target, - } - more_statements = { - "statements": [ - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/seeked"}), - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/played"}), - mock_statement(verb={"id": "https://w3id.org/xapi/video/verbs/paused"}), - ] - } - - # Mock GET response of HTTPX for target and "more" target without query parameter - default_params = LRSStatementsQuery(limit=500).dict( - exclude_none=True, exclude_unset=True - ) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=target, - query=urlencode(default_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=statements, - ) - default_params.update(dict(parse_qsl(urlparse(more_target).query))) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=urlparse(more_target).path, - query=urlencode(default_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=more_statements, - ) - - # Return an iterable of dict - result = await _unpack_async_generator( - backend.read(target=target, raw_output=False) - ) - assert result == statements["statements"] + more_statements["statements"] - assert len(result) == 6 - - # Return an iterable of bytes - result = await _unpack_async_generator(backend.read(target=target, raw_output=True)) - assert result == [ - bytes(json.dumps(statement), encoding="utf-8") - for statement in statements["statements"] + more_statements["statements"] - ] - assert len(result) == 6 - - # Mock GET response of HTTPX with query parameter - query = LRSStatementsQuery(verb="https://w3id.org/xapi/video/verbs/played") - - statements_with_query_played_verb = { - "statements": [ - raw for raw in statements["statements"] if "played" in raw["verb"]["id"] - ], - "more": more_target, - } - more_statements_with_query_played_verb = { - "statements": [ - raw - for raw in more_statements["statements"] - if "played" in raw["verb"]["id"] - ] - } - query_params = query.dict(exclude_none=True, exclude_unset=True) - query_params.update({"limit": 500}) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=target, - query=urlencode(query_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=statements_with_query_played_verb, - ) - query_params.update(dict(parse_qsl(urlparse(more_target).query))) - httpx_mock.add_response( - url=ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=urlparse(more_target).path, - query=urlencode(query_params).lower(), - params="", - fragment="", - ).geturl(), - method="GET", - json=more_statements_with_query_played_verb, - ) - - # Return an iterable of dict - - result = await _unpack_async_generator( - backend.read(query=query, target=target, raw_output=False) - ) - assert ( - result - == statements_with_query_played_verb["statements"] - + more_statements_with_query_played_verb["statements"] - ) - assert len(result) == 2 - - # Return an iterable of bytes - result = await _unpack_async_generator( - backend.read(query=query, target=target, raw_output=True) - ) - - assert result == [ - bytes(json.dumps(statement), encoding="utf-8") - for statement in statements_with_query_played_verb["statements"] - + more_statements_with_query_played_verb["statements"] - ] - assert len(result) == 2 - - -@pytest.mark.anyio -@pytest.mark.parametrize( - "chunk_size,simultaneous,max_num_simultaneous", - [ - (None, False, None), - (500, False, None), - (2, False, None), - (500, True, 1), - (2, True, 1), - (500, True, 2), - (2, True, 2), - (500, True, None), - (2, True, None), - ], -) -async def test_backends_http_async_lrs_write_without_operation( - httpx_mock: HTTPXMock, caplog, chunk_size, simultaneous, max_num_simultaneous -): - """Test the LRS backend `write` method, given no operation_type should POST to - the LRS server. - """ - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - data = [mock_statement() for _ in range(6)] - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - # Mock HTTPX POST - httpx_mock.add_response(url=urljoin(base_url, target), method="POST", json=data) - - with caplog.at_level(logging.DEBUG): - result = await backend.write( - target=target, - data=data, - chunk_size=chunk_size, - simultaneous=simultaneous, - max_num_simultaneous=max_num_simultaneous, - ) - - # If no chunk_size is provided, a default value (500) should be used. - if chunk_size is None: - chunk_size = 500 - - assert ( - "ralph.backends.http.async_lrs", - logging.DEBUG, - f"Start writing to the {base_url}{target} endpoint (chunk size: {chunk_size})", - ) in caplog.record_tuples - - assert ( - "ralph.backends.http.async_lrs", - logging.DEBUG, - f"Posted {len(data)} statements", - ) in caplog.record_tuples - - assert result == len(data) - - -@pytest.mark.anyio -async def test_backends_http_async_lrs_write_without_data(caplog): - """Test the LRS backend `write` method returns null when no data to write - in the target endpoint are given. - """ - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - with caplog.at_level(logging.INFO): - result = await backend.write(target=target, data=[]) - - assert ( - "ralph.backends.http.async_lrs", - logging.INFO, - "Data Iterator is empty; skipping write to target", - ) in caplog.record_tuples - - assert result == 0 - - -@pytest.mark.parametrize( - "operation_type,error_msg", - [ - (OperationType.APPEND, "append operation type is not supported."), - (OperationType.UPDATE, "update operation type is not supported."), - (OperationType.DELETE, "delete operation type is not supported."), - ], -) -@pytest.mark.anyio -async def test_backends_http_async_lrs_write_with_unsupported_operation( - operation_type, caplog, error_msg -): - """Test the LRS backend `write` method, given an unsupported` `operation_type`, - should raise a `BackendParameterException`. - """ - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - with pytest.raises(BackendParameterException, match=error_msg): - with caplog.at_level(logging.ERROR): - await backend.write( - target=target, data=[b"foo"], operation_type=operation_type - ) - - assert ( - "ralph.backends.http.async_lrs", - logging.DEBUG, - f"{operation_type.value} operation type is not supported.", - ) in caplog.record_tuples - - -@pytest.mark.parametrize( - "simultaneous,max_num_simultaneous,error_msg", - [ - (True, 0, "max_num_simultaneous must be a strictly positive integer"), - (False, 2, "max_num_simultaneous can only be used with `simultaneous=True`"), - ], -) -@pytest.mark.anyio -async def test_backends_https_async_lrs_write_with_invalid_parameters( - caplog, simultaneous, max_num_simultaneous, error_msg -): - """Test the LRS backend `write` method, given invalid_parameters - should raise a `BackendParameterException`. - """ - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements/" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - with pytest.raises(BackendParameterException, match=error_msg): - with caplog.at_level(logging.ERROR): - await backend.write( - target=target, - data=[b"foo"], - simultaneous=simultaneous, - max_num_simultaneous=max_num_simultaneous, - ) - - assert ( - "ralph.backends.http.async_lrs", - logging.ERROR, - error_msg, - ) in caplog.record_tuples - - -@pytest.mark.anyio -async def test_backends_https_async_lrs_write_without_target( - httpx_mock: HTTPXMock, caplog -): - """Test the LRS backend `write` method without target parameter value writes - statements to '/xAPI/statements' default endpoint. - """ - - base_url = "http://fake-lrs.com" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - data = [mock_statement() for _ in range(3)] - - # Mock HTTPX POST - httpx_mock.add_response( - url=urljoin(base_url, backend.settings.STATEMENTS_ENDPOINT), - method="POST", - json=data, - ) - - with caplog.at_level(logging.DEBUG): - result = await backend.write(data=data, operation_type=OperationType.CREATE) - assert ( - "ralph.backends.http.async_lrs", - logging.DEBUG, - "Start writing to the " - f"{base_url}{LRSHTTPBackendSettings().STATEMENTS_ENDPOINT} " - "endpoint (chunk size: 500)", - ) in caplog.record_tuples - - assert result == len(data) - - -@pytest.mark.anyio -async def test_backends_https_async_lrs_write_with_create_or_index_operation( - httpx_mock: HTTPXMock, caplog -): - """Test the `LRSHTTP.write` method with `CREATE` or `INDEX` operation_type writes - statements to the given target endpoint. - """ - - base_url = "http://fake-lrs.com" - target = "/xAPI/statements" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - data = [mock_statement() for _ in range(3)] - - # Mock HTTPX POST - httpx_mock.add_response(url=urljoin(base_url, target), method="POST", json=data) - - with caplog.at_level(logging.INFO): - result = await backend.write( - target=target, data=data, operation_type=OperationType.CREATE - ) - assert result == len(data) - - with caplog.at_level(logging.INFO): - result = await backend.write( - target=target, data=data, operation_type=OperationType.INDEX - ) - assert result == len(data) - - -@pytest.mark.anyio -async def test_backends_https_async_lrs_write_backend_exception( - httpx_mock: HTTPXMock, - caplog, -): - """Test the `LRSHTTP.write` method with HTTP error.""" - base_url = "http://fake-lrs.com" - target = "/xAPI/statements" - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - data = [mock_statement()] - - # Mock HTTPX POST - httpx_mock.add_response( - url=urljoin(base_url, target), method="POST", json=data, status_code=500 - ) - with pytest.raises(BackendException): - with caplog.at_level(logging.ERROR): - await backend.write(target=target, data=data) - assert ( - "ralph.backends.http.async_lrs", - logging.ERROR, - "Failed to post statements", - ) in caplog.record_tuples - - -# Asynchronicity tests for dev purposes (skip in CI) - - -@pytest.mark.skip(reason="Timing based tests are too unstable to run in CI.") -@pytest.mark.anyio -@pytest.mark.parametrize( - "num_pages,chunk_size,network_latency_time", [(3, 3, 0.2), (10, 3, 0.2)] -) -async def test_backends_https_async_lrs_read_concurrency( - httpx_mock: HTTPXMock, num_pages, chunk_size, network_latency_time -): - """Test concurrency performances in `read`, for development use. - - Args: - num_pages: the number of pages to generate - chunk_size: the number of results per page - network_latency_time: the wait duration before GET results - - NB: Maximal gains are (num_pages-1)*fetch_time; when batch_processing_time > - fetch_time - """ - - async def _simulate_network_latency(request: httpx.Request, response): - """Return requested statements after an async sleep time.""" - - await asyncio.sleep(network_latency_time) - return httpx.Response(status_code=200, json=response) - - async def _simulate_slow_processing(): - """Async sleep for a fixed amount of time.""" - # Time per chunk = API response time to maximize saved time - processing_time = network_latency_time / chunk_size - await asyncio.sleep(processing_time) - - base_url = "http://fake-lrs.com" - - # Generate fake targets - target_template = "/xAPI/statements/?pit_id=fake-pit-{}" - targets = {0: "/xAPI/statements/"} - for index in range(1, num_pages): - targets[index] = target_template.format(index) - - # Generate fake statements - all_statements = {} - for index in range(num_pages): - all_statements[index] = { - "statements": [mock_statement() for _ in range(chunk_size)] - } - if index < num_pages - 1: - all_statements[index]["more"] = targets[index + 1] - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - # Mock HTTPX GET - params = {"limit": chunk_size} - for index in range(num_pages): - url = ParseResult( - scheme=urlparse(base_url).scheme, - netloc=urlparse(base_url).netloc, - path=urlparse(targets[index]).path, - query=urlencode(params), - params="", - fragment="", - ).geturl() - - statements = all_statements[index] - httpx_mock.add_callback( - partial(_simulate_network_latency, response=statements), - url=url, - method="GET", - ) - - if index < num_pages - 1: - params.update(dict(parse_qsl(urlparse(targets[index + 1]).query))) - - # Check that greedy read is faster than non-greedy when processing is slow - time_1 = time.time() - counter = 0 - async for _ in backend.read(target=targets[0], chunk_size=chunk_size, greedy=False): - await _simulate_slow_processing() - counter += 1 - duration_non_greedy = time.time() - time_1 - - time_2 = time.time() - async for _ in backend.read(target=targets[0], chunk_size=chunk_size, greedy=True): - await _simulate_slow_processing() - duration_greedy = time.time() - time_2 - - # Assert gains are close enough to theoretical gains - proximity_ratio = 0.9 - assert ( - duration_non_greedy - > duration_greedy + proximity_ratio * (num_pages - 1) * network_latency_time - ) - - -@pytest.mark.skip(reason="Timing based tests are too unstable to run in CI") -@pytest.mark.anyio -async def test_backends_https_async_lrs_write_concurrency( - httpx_mock: HTTPXMock, -): - """Test concurrency performances in `write`, for development use.""" - - base_url = "http://fake-lrs.com" - - data = [mock_statement() for _ in range(6)] - - # Changing data length might break tests - assert len(data) == 6 - - settings = LRSHTTPBackendSettings( - BASE_URL=base_url, - USERNAME="user", - PASSWORD="pass", - ) - backend = AsyncLRSHTTPBackend(settings) - - # Mock HTTPX POST - async def simulate_network_latency(request: httpx.Request): - await asyncio.sleep(0.5) - return httpx.Response( - status_code=200, - ) - - httpx_mock.add_callback(simulate_network_latency) - - # Check that simultaneous write is faster than non-simultaneous - time_1 = time.time() - await backend.write( - data=data, - chunk_size=2, - simultaneous=False, - max_num_simultaneous=None, - ) - duration_non_simultaneous = time.time() - time_1 - - time_2 = time.time() - await backend.write( - data=data, - chunk_size=2, - simultaneous=True, - max_num_simultaneous=None, - ) - duration_simultaneous = time.time() - time_2 - - # Server side processing time should be 3 times faster - assert duration_non_simultaneous > 2.1 * duration_simultaneous - - # Check that write with max_num_simultaneous functions as expected - time_1 = time.time() - await backend.write( - data=data, - chunk_size=1, - simultaneous=True, - max_num_simultaneous=2, - ) - duration_limited_simultaneous = time.time() - time_1 - - time_2 = time.time() - await backend.write( - data=data, - chunk_size=1, - simultaneous=True, - max_num_simultaneous=None, - ) - duration_unlimited_simultaneous = time.time() - time_2 - - # Server side processing time should be 3 times faster with unlimited - assert duration_limited_simultaneous > 2.1 * duration_unlimited_simultaneous - assert duration_limited_simultaneous <= 3.1 * duration_unlimited_simultaneous diff --git a/tests/backends/http/test_base.py b/tests/backends/http/test_base.py deleted file mode 100644 index d5bbf9333..000000000 --- a/tests/backends/http/test_base.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Tests for Ralph base HTTP backend.""" - -from typing import Iterator, Union - -from ralph.backends.http.base import BaseHTTPBackend, BaseQuery - - -def test_backends_http_base_abstract_interface_with_implemented_abstract_method(): - """Test the interface mechanism with properly implemented abstract methods.""" - - class GoodStorage(BaseHTTPBackend): - """Correct implementation with required abstract methods.""" - - name = "good" - - async def status(self): - """Fake the status method.""" - - async def list( - self, target: str = None, details: bool = False, new: bool = False - ) -> Iterator[Union[str, dict]]: - """Fake the list method.""" - - async def read(self): - """Fake the read method.""" - - async def write(self): - """Fake the write method.""" - - GoodStorage() - - assert GoodStorage.name == "good" - assert GoodStorage.query == BaseQuery diff --git a/tests/backends/http/test_lrs.py b/tests/backends/http/test_lrs.py deleted file mode 100644 index 130fb0df8..000000000 --- a/tests/backends/http/test_lrs.py +++ /dev/null @@ -1,167 +0,0 @@ -"""Tests for Ralph Async LRS HTTP backend.""" - -import asyncio -import re -from unittest.mock import AsyncMock - -import pytest -from pydantic import AnyHttpUrl, parse_obj_as - -from ralph.backends.http.async_lrs import ( - AsyncLRSHTTPBackend, - HTTPBackendStatus, - LRSHeaders, - LRSHTTPBackendSettings, -) -from ralph.backends.http.lrs import LRSHTTPBackend -from ralph.backends.lrs.base import LRSStatementsQuery - - -@pytest.mark.anyio -@pytest.mark.parametrize("method", ["status", "list", "write", "read"]) -async def test_backends_http_lrs_in_async_setting(monkeypatch, method): - """Test that backend returns the proper error when run in async function.""" - - # Define mock responses - if method == "read": - read_mock_response = [{"hello": "world"}, {"save": "pandas"}] - - async def response_mock(*args, **kwargs): - """Mock a read function.""" - - for statement in read_mock_response: - yield statement - - else: - response_mock = AsyncMock(return_value=HTTPBackendStatus.OK) - monkeypatch.setattr(AsyncLRSHTTPBackend, method, response_mock) - - async def async_function(): - """Encapsulate the synchronous method in an asynchronous function.""" - lrs = LRSHTTPBackend() - if method == "read": - list(getattr(lrs, method)()) - else: - getattr(lrs, method)() - - # Check that the proper error is raised - with pytest.raises( - RuntimeError, - match=re.escape( - ( - f"This event loop is already running. You must use " - f"`AsyncLRSHTTPBackend.{method}` (instead of `LRSHTTPBackend.{method}`)" - ", or run this code outside the current event loop." - ) - ), - ): - await async_function() - - -@pytest.mark.anyio -def test_backends_http_lrs_default_instantiation(monkeypatch, fs): - """Test the `LRSHTTPBackend` default instantiation.""" - fs.create_file(".env") - backend_settings_names = [ - "BASE_URL", - "USERNAME", - "PASSWORD", - "HEADERS", - "STATUS_ENDPOINT", - "STATEMENTS_ENDPOINT", - ] - for name in backend_settings_names: - monkeypatch.delenv(f"RALPH_BACKENDS__HTTP__LRS__{name}", raising=False) - - assert LRSHTTPBackend.name == "lrs" - assert LRSHTTPBackend.settings_class == LRSHTTPBackendSettings - backend = LRSHTTPBackend() - assert backend.query == LRSStatementsQuery - assert backend.base_url == parse_obj_as(AnyHttpUrl, "http://0.0.0.0:8100") - assert backend.auth == ("ralph", "secret") - assert backend.settings.HEADERS == LRSHeaders() - assert backend.settings.STATUS_ENDPOINT == "/__heartbeat__" - assert backend.settings.STATEMENTS_ENDPOINT == "/xAPI/statements" - - # Test overriding default values with environment variables. - monkeypatch.setenv("RALPH_BACKENDS__HTTP__LRS__USERNAME", "foo") - backend = LRSHTTPBackend() - assert backend.auth == ("foo", "secret") - - -def test_backends_http_lrs_instantiation_with_settings(): - """Test the LRS backend default instantiation.""" - - headers = LRSHeaders( - X_EXPERIENCE_API_VERSION="1.0.3", CONTENT_TYPE="application/json" - ) - settings = LRSHTTPBackendSettings( - BASE_URL="http://fake-lrs.com", - USERNAME="user", - PASSWORD="pass", - HEADERS=headers, - STATUS_ENDPOINT="/fake-status-endpoint", - STATEMENTS_ENDPOINT="/xAPI/statements", - ) - - assert LRSHTTPBackend.name == "lrs" - assert LRSHTTPBackend.settings_class == LRSHTTPBackendSettings - backend = LRSHTTPBackend(settings) - assert backend.query == LRSStatementsQuery - assert isinstance(backend.base_url, AnyHttpUrl) - assert backend.auth == ("user", "pass") - assert backend.settings.HEADERS.CONTENT_TYPE == "application/json" - assert backend.settings.HEADERS.X_EXPERIENCE_API_VERSION == "1.0.3" - assert backend.settings.STATUS_ENDPOINT == "/fake-status-endpoint" - assert backend.settings.STATEMENTS_ENDPOINT == "/xAPI/statements" - - -def test_backends_http_lrs_inheritence(monkeypatch): - """Test that `LRSHTTPBackend` properly inherits from `AsyncLRSHTTPBackend`.""" - lrs = LRSHTTPBackend() - - # Necessary when using anyio - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - # Test class inheritence - assert issubclass(lrs.__class__, AsyncLRSHTTPBackend) - - # Test "status" - status_mock_response = HTTPBackendStatus.OK - status_mock = AsyncMock(return_value=status_mock_response) - monkeypatch.setattr(AsyncLRSHTTPBackend, "status", status_mock) - assert lrs.status() == status_mock_response - status_mock.assert_awaited() - - # Test "list" - list_exception = NotImplementedError - list_mock = AsyncMock(side_effect=list_exception) - monkeypatch.setattr(AsyncLRSHTTPBackend, "list", list_mock) - with pytest.raises(list_exception): - lrs.list() - - # Test "read" - read_mock_response = [{"hello": "world"}, {"save": "pandas"}] - read_chunk_size = 11 - - async def read_mock(*args, **kwargs): - """Mock a read function.""" - - # For simplicity, check that parameters are passed in function - assert kwargs["chunk_size"] == read_chunk_size - for statement in read_mock_response: - yield statement - - monkeypatch.setattr(AsyncLRSHTTPBackend, "read", read_mock) - assert list(lrs.read(chunk_size=read_chunk_size)) == read_mock_response - - # Test "write" - write_mock_response = 118218 - chunk_size = 17 - write_mock = AsyncMock(return_value=write_mock_response) - monkeypatch.setattr(AsyncLRSHTTPBackend, "write", write_mock) - assert lrs.write(chunk_size=chunk_size) == write_mock_response - write_mock.assert_called_with(chunk_size=chunk_size) - - loop.close() diff --git a/tests/backends/test_loader.py b/tests/backends/test_loader.py index 07c983ea6..62bff3624 100644 --- a/tests/backends/test_loader.py +++ b/tests/backends/test_loader.py @@ -3,16 +3,16 @@ import logging from ralph.backends.data.async_es import AsyncESDataBackend +from ralph.backends.data.async_lrs import AsyncLRSDataBackend from ralph.backends.data.async_mongo import AsyncMongoDataBackend from ralph.backends.data.clickhouse import ClickHouseDataBackend from ralph.backends.data.es import ESDataBackend from ralph.backends.data.fs import FSDataBackend from ralph.backends.data.ldp import LDPDataBackend +from ralph.backends.data.lrs import LRSDataBackend from ralph.backends.data.mongo import MongoDataBackend from ralph.backends.data.s3 import S3DataBackend from ralph.backends.data.swift import SwiftDataBackend -from ralph.backends.http.async_lrs import AsyncLRSHTTPBackend -from ralph.backends.http.lrs import LRSHTTPBackend from ralph.backends.loader import ( get_backends, get_cli_backends, @@ -61,13 +61,13 @@ def test_backends_loader_get_cli_backends(): """Test the `get_cli_backends` function.""" assert get_cli_backends() == { "async_es": AsyncESDataBackend, - "async_lrs": AsyncLRSHTTPBackend, + "async_lrs": AsyncLRSDataBackend, "async_mongo": AsyncMongoDataBackend, "clickhouse": ClickHouseDataBackend, "es": ESDataBackend, "fs": FSDataBackend, "ldp": LDPDataBackend, - "lrs": LRSHTTPBackend, + "lrs": LRSDataBackend, "mongo": MongoDataBackend, "s3": S3DataBackend, "swift": SwiftDataBackend, @@ -79,12 +79,12 @@ def test_backends_loader_get_cli_write_backends(): """Test the `get_cli_write_backends` function.""" assert get_cli_write_backends() == { "async_es": AsyncESDataBackend, - "async_lrs": AsyncLRSHTTPBackend, + "async_lrs": AsyncLRSDataBackend, "async_mongo": AsyncMongoDataBackend, "clickhouse": ClickHouseDataBackend, "es": ESDataBackend, "fs": FSDataBackend, - "lrs": LRSHTTPBackend, + "lrs": LRSDataBackend, "mongo": MongoDataBackend, "s3": S3DataBackend, "swift": SwiftDataBackend, diff --git a/tests/conftest.py b/tests/conftest.py index 6b1050e95..4ae97197c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,10 +27,12 @@ es_forwarding, es_lrs_backend, events, + flavor, fs_backend, fs_lrs_backend, ldp_backend, lrs, + lrs_backend, mongo, mongo_backend, mongo_forwarding, diff --git a/tests/fixtures/backends.py b/tests/fixtures/backends.py index c04f75320..60224670c 100644 --- a/tests/fixtures/backends.py +++ b/tests/fixtures/backends.py @@ -6,9 +6,10 @@ import random import time from contextlib import asynccontextmanager -from functools import lru_cache +from functools import lru_cache, wraps from multiprocessing import Process from pathlib import Path +from typing import Callable, Optional, Union import boto3 import botocore @@ -18,10 +19,12 @@ import websockets from elasticsearch import BadRequestError, Elasticsearch from httpx import AsyncClient, ConnectError +from pydantic import AnyHttpUrl, parse_obj_as from pymongo import MongoClient from pymongo.errors import CollectionInvalid from ralph.backends.data.async_es import AsyncESDataBackend +from ralph.backends.data.async_lrs import AsyncLRSDataBackend from ralph.backends.data.async_mongo import AsyncMongoDataBackend from ralph.backends.data.clickhouse import ( ClickHouseClientOptions, @@ -30,6 +33,7 @@ from ralph.backends.data.es import ESDataBackend from ralph.backends.data.fs import FSDataBackend from ralph.backends.data.ldp import LDPDataBackend +from ralph.backends.data.lrs import LRSDataBackend, LRSHeaders from ralph.backends.data.mongo import MongoDataBackend from ralph.backends.data.s3 import S3DataBackend from ralph.backends.data.swift import SwiftDataBackend @@ -241,6 +245,73 @@ def anyio_backend(): return "asyncio" +@pytest.fixture(params=["sync", "async"]) +def flavor(request): + """Parametrize fixture with `sync`/`async` flavor.""" + return request.param + + +@pytest.mark.anyio +@pytest.fixture +def lrs_backend( + flavor, +) -> Callable[[Optional[str]], Union[LRSDataBackend, AsyncLRSDataBackend]]: + """Return the `get_lrs_test_backend` function.""" + backend_class = LRSDataBackend if flavor == "sync" else AsyncLRSDataBackend + + def make_awaitable(sync_func): + """Make a synchronous callable awaitable.""" + + @wraps(sync_func) + async def async_func(*args, **kwargs): + kwargs.pop("concurrency", None) + return sync_func(*args, **kwargs) + + return async_func + + def make_awaitable_generator(sync_func): + """Make a synchronous generator awaitable.""" + + @wraps(sync_func) + async def async_func(*args, **kwargs): + kwargs.pop("prefetch", None) + for item in sync_func(*args, **kwargs): + yield item + + return async_func + + def _get_lrs_test_backend( + base_url: Optional[str] = "http://fake-lrs.com", + ) -> Union[LRSDataBackend, AsyncLRSDataBackend]: + """Return an (Async)LRSDataBackend backend instance using test defaults.""" + headers = { + "X_EXPERIENCE_API_VERSION": "1.0.3", + "CONTENT_TYPE": "application/json", + } + settings = backend_class.settings_class( + BASE_URL=parse_obj_as(AnyHttpUrl, base_url), + USERNAME="user", + PASSWORD="pass", + HEADERS=LRSHeaders.parse_obj(headers), + LOCALE_ENCODING="utf8", + STATUS_ENDPOINT="/__heartbeat__", + STATEMENTS_ENDPOINT="/xAPI/statements/", + READ_CHUNK_SIZE=500, + WRITE_CHUNK_SIZE=500, + ) + backend = backend_class(settings) + + if isinstance(backend, LRSDataBackend): + backend.status = make_awaitable(backend.status) # type: ignore + backend.read = make_awaitable_generator(backend.read) # type: ignore + backend.write = make_awaitable(backend.write) # type: ignore + backend.close = make_awaitable(backend.close) # type: ignore + + return backend + + return _get_lrs_test_backend + + @pytest.fixture def async_mongo_backend(): """Return the `get_mongo_data_backend` function.""" diff --git a/tests/test_cli.py b/tests/test_cli.py index ae097ec38..c4e6df334 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -882,11 +882,11 @@ def test_cli_write_command_with_fs_backend(fs): assert result.exit_code == 1 assert "file1 already exists and overwrite is not allowed" in result.output - # Try to create the same file with -f + # Try to create the same file with -o update runner = CliRunner() result = runner.invoke( cli, - "write -b fs -t file1 -f --fs-default-directory-path foo".split(), + "write -b fs -t file1 -o update --fs-default-directory-path foo".split(), input=b"other content", ) diff --git a/tests/test_cli_usage.py b/tests/test_cli_usage.py index 6c19651ed..54c269169 100644 --- a/tests/test_cli_usage.py +++ b/tests/test_cli_usage.py @@ -127,10 +127,13 @@ def test_cli_read_command_usage(): " async_lrs backend: \n" " --async-lrs-base-url TEXT\n" " --async-lrs-headers KEY=VALUE,KEY=VALUE\n" + " --async-lrs-locale-encoding TEXT\n" " --async-lrs-password TEXT\n" + " --async-lrs-read-chunk-size INTEGER\n" " --async-lrs-statements-endpoint TEXT\n" " --async-lrs-status-endpoint TEXT\n" " --async-lrs-username TEXT\n" + " --async-lrs-write-chunk-size INTEGER\n" " async_mongo backend: \n" " --async-mongo-client-options KEY=VALUE,KEY=VALUE\n" " --async-mongo-connection-uri MONGODSN\n" @@ -180,10 +183,13 @@ def test_cli_read_command_usage(): " lrs backend: \n" " --lrs-base-url TEXT\n" " --lrs-headers KEY=VALUE,KEY=VALUE\n" + " --lrs-locale-encoding TEXT\n" " --lrs-password TEXT\n" + " --lrs-read-chunk-size INTEGER\n" " --lrs-statements-endpoint TEXT\n" " --lrs-status-endpoint TEXT\n" " --lrs-username TEXT\n" + " --lrs-write-chunk-size INTEGER\n" " mongo backend: \n" " --mongo-client-options KEY=VALUE,KEY=VALUE\n" " --mongo-connection-uri MONGODSN\n" @@ -219,7 +225,7 @@ def test_cli_read_command_usage(): " --swift-write-chunk-size INTEGER\n" " ws backend: \n" " --ws-uri TEXT\n" - " -c, --chunk-size INTEGER Get events by chunks of size #\n" + " -s, --chunk-size INTEGER Get events by chunks of size #\n" " -t, --target TEXT Endpoint from which to read events (e.g.\n" " `/statements`)\n" ' -q, --query \'{"KEY": "VALUE", "KEY": "VALUE"}\'\n' @@ -229,8 +235,8 @@ def test_cli_read_command_usage(): " -i, --ignore_errors BOOLEAN Ignore errors during the encoding operation." "\n" " [default: False]\n" - " --help Show this message and exit." - ) in result.output + " --help Show this message and exit.\n" + ) == result.output logging.warning(result.output) result = runner.invoke(cli, ["read"]) assert result.exit_code > 0 @@ -346,7 +352,7 @@ def test_cli_list_command_usage(): " -n, --new / -a, --all List not fetched (or all) documents\n" " -D, --details / -I, --ids Get documents detailed output (JSON)\n" " --help Show this message and exit.\n" - ) in result.output + ) == result.output result = runner.invoke(cli, ["list"]) assert result.exit_code > 0 @@ -385,10 +391,13 @@ def test_cli_write_command_usage(): " async_lrs backend: \n" " --async-lrs-base-url TEXT\n" " --async-lrs-headers KEY=VALUE,KEY=VALUE\n" + " --async-lrs-locale-encoding TEXT\n" " --async-lrs-password TEXT\n" + " --async-lrs-read-chunk-size INTEGER\n" " --async-lrs-statements-endpoint TEXT\n" " --async-lrs-status-endpoint TEXT\n" " --async-lrs-username TEXT\n" + " --async-lrs-write-chunk-size INTEGER\n" " async_mongo backend: \n" " --async-mongo-client-options KEY=VALUE,KEY=VALUE\n" " --async-mongo-connection-uri MONGODSN\n" @@ -427,10 +436,13 @@ def test_cli_write_command_usage(): " lrs backend: \n" " --lrs-base-url TEXT\n" " --lrs-headers KEY=VALUE,KEY=VALUE\n" + " --lrs-locale-encoding TEXT\n" " --lrs-password TEXT\n" + " --lrs-read-chunk-size INTEGER\n" " --lrs-statements-endpoint TEXT\n" " --lrs-status-endpoint TEXT\n" " --lrs-username TEXT\n" + " --lrs-write-chunk-size INTEGER\n" " mongo backend: \n" " --mongo-client-options KEY=VALUE,KEY=VALUE\n" " --mongo-connection-uri MONGODSN\n" @@ -464,22 +476,18 @@ def test_cli_write_command_usage(): " --swift-username TEXT\n" " --swift-user-domain-name TEXT\n" " --swift-write-chunk-size INTEGER\n" - " -c, --chunk-size INTEGER Get events by chunks of size #\n" - " -f, --force Overwrite existing archives or records\n" + " -t, --target TEXT The target container to write into\n" + " -s, --chunk-size INTEGER Get events by chunks of size #\n" " -I, --ignore-errors Continue writing regardless of raised errors" "\n" - " -s, --simultaneous With HTTP backend, POST all chunks\n" - " simultaneously (instead of sequentially)\n" - " -m, --max-num-simultaneous INTEGER\n" - " The maximum number of chunks to send at once" - ",\n" - " when using `--simultaneous`. Use `-1` to not" - "\n" - " set a limit.\n" - " -t, --target TEXT The target container to write into\n" + " -o, --operation-type OP_TYPE Either index, create, delete, update or " + "append\n" + " -c, --concurrency INTEGER Number of chunks to write concurrently. (" + "async\n" + " backends only)\n" " --help Show this message and exit.\n" ) - assert expected_output in result.output + assert expected_output == result.output result = runner.invoke(cli, ["write"]) assert result.exit_code > 0 diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 88730ced5..1e2adb742 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -7,6 +7,8 @@ import pytest from click.testing import CliRunner +from ralph import cli + def test_dependencies_ralph_command_requires_click(monkeypatch): """Test Click module installation while executing the ralph command.""" @@ -30,16 +32,10 @@ def test_dependencies_ralph_command_requires_click(monkeypatch): def test_dependencies_runserver_subcommand_requires_uvicorn(monkeypatch): """Test Uvicorn module installation while executing the runserver sub command.""" - monkeypatch.setitem(sys.modules, "uvicorn", None) - - # Force ralph.cli reload now that uvicorn is considered as missing - if "ralph.cli" in sys.modules: - del sys.modules["ralph.cli"] - cli = importlib.import_module("ralph.cli") - + monkeypatch.delattr(cli, "uvicorn") + monkeypatch.setattr(cli, "configure_logging", lambda: None) runner = CliRunner() result = runner.invoke(cli.cli, "runserver -b es".split()) - assert isinstance(result.exception, ModuleNotFoundError) assert str(result.exception) == ( "You need to install 'lrs' optional dependencies to use the runserver "