Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🏗️(backends) migrate LRSHTTPBackend to LRSDataBackend #524

Merged
merged 1 commit into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .env.dist
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,13 @@ RALPH_BACKENDS__DATA__CLICKHOUSE__TEST_TABLE_NAME=test_xapi_events_all

# LRS HTTP backend

RALPH_BACKENDS__HTTP__LRS__BASE_URL=http://ralph:[email protected]:8100/
RALPH_BACKENDS__HTTP__LRS__USERNAME=ralph
RALPH_BACKENDS__HTTP__LRS__PASSWORD=secret
RALPH_BACKENDS__HTTP__LRS__HEADERS__X_EXPERIENCE_API_VERSION=1.0.3
RALPH_BACKENDS__HTTP__LRS__HEADERS__CONTENT_TYPE=application/json
RALPH_BACKENDS__HTTP__LRS__STATUS_ENDPOINT=/__heartbeat__
RALPH_BACKENDS__HTTP__LRS__STATEMENTS_ENDPOINT=/xAPI/statements
RALPH_BACKENDS__DATA__LRS__BASE_URL=http://ralph:[email protected]:8100/
RALPH_BACKENDS__DATA__LRS__USERNAME=ralph
RALPH_BACKENDS__DATA__LRS__PASSWORD=secret
RALPH_BACKENDS__DATA__LRS__HEADERS__X_EXPERIENCE_API_VERSION=1.0.3
RALPH_BACKENDS__DATA__LRS__HEADERS__CONTENT_TYPE=application/json
RALPH_BACKENDS__DATA__LRS__STATUS_ENDPOINT=/__heartbeat__
RALPH_BACKENDS__DATA__LRS__STATEMENTS_ENDPOINT=/xAPI/statements

# Sentry

Expand Down
22 changes: 8 additions & 14 deletions docs/backends/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ ralph list --backend swift # [...] more options
The general patterns for backend parameters are:

- `--{{ backend_name }}-{{ parameter | underscore_to_dash }}` for command options, and,
- `RALPH_BACKENDS__{{ backend_type | uppercase }}__{{ backend_name | uppercase }}__{{ parameter | uppercase }}` for environment variables, where the `backend_type` is one of `DATA`, `LRS` and `STREAM`.
- `RALPH_BACKENDS__DATA__{{ backend_name | uppercase }}__{{ parameter | uppercase }}` for environment variables.

## Elasticsearch

Expand Down Expand Up @@ -69,6 +69,11 @@ documents from it.
members:
- attributes

The ClickHouse client options supported in Ralph can be found in these locations:

- [Python driver specific](https://clickhouse.com/docs/en/integrations/language-clients/python/driver-api#settings-argument)
- [General ClickHouse client settings](https://clickhouse.com/docs/en/operations/settings/settings/)

## OVH - Log Data Platform (LDP)

LDP is a nice service built by OVH on top of Graylog to follow, analyse and
Expand Down Expand Up @@ -150,23 +155,12 @@ The only required parameter is the `path` we want to list or stream content from
members:
- attributes

The ClickHouse client options supported in Ralph can be found in these locations:

- [Python driver specific](https://clickhouse.com/docs/en/integrations/language-clients/python/driver-api#settings-argument)
- [General ClickHouse client settings](https://clickhouse.com/docs/en/operations/settings/settings/)

## Learning Record Store (LRS) - HTTP backend interface

!!! warning

`HTTP` backend type will soon be merged into the `Data` backend type.

PR [#521](https://github.com/openfun/ralph/pull/521) prepares `Data` backend alignments with `HTTP` backend `read` method signature
## Learning Record Store (LRS)

The LRS backend is used to store and retrieve xAPI statements from various systems that follow the [xAPI specification](https://github.com/adlnet/xAPI-Spec/tree/master) (such as our own Ralph LRS, which can be run from this package).
LRS systems are mostly used in e-learning infrastructures.

### ::: ralph.backends.http.async_lrs.LRSHTTPBackendSettings
### ::: ralph.backends.data.lrs.LRSDataBackendSettings
handler: python
options:
show_root_heading: false
Expand Down
295 changes: 295 additions & 0 deletions src/ralph/backends/data/async_lrs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
"""Async LRS data backend for Ralph."""

from io import IOBase
from typing import AsyncIterator, Iterable, Optional, Union
from urllib.parse import ParseResult, parse_qs, urljoin, urlparse

from httpx import AsyncClient, HTTPError, HTTPStatusError, RequestError
from pydantic import AnyHttpUrl, PositiveInt, parse_obj_as

from ralph.backends.data.base import (
AsyncWritable,
BaseAsyncDataBackend,
BaseOperationType,
DataBackendStatus,
Loggable,
)
from ralph.backends.data.lrs import LRSDataBackendSettings, StatementResponse
from ralph.backends.lrs.base import LRSStatementsQuery
from ralph.exceptions import BackendException
from ralph.utils import async_parse_dict_to_bytes, iter_by_batch, parse_iterable_to_dict


class AsyncLRSDataBackend(
BaseAsyncDataBackend[LRSDataBackendSettings, LRSStatementsQuery],
AsyncWritable,
Loggable,
):
"""Asynchronous LRS data backend."""

name = "async_lrs"
default_operation_type = BaseOperationType.CREATE
unsupported_operation_types = {
BaseOperationType.APPEND,
BaseOperationType.UPDATE,
BaseOperationType.DELETE,
}

def __init__(self, settings: Optional[LRSDataBackendSettings] = None) -> None:
"""Instantiate the LRS HTTP (basic auth) backend client.

Args:
settings (LRSDataBackendSettings or None): The LRS data backend settings.
If `settings` is `None`, a default settings instance is used instead.
"""
super().__init__(settings)
self.base_url = parse_obj_as(AnyHttpUrl, self.settings.BASE_URL)
self.auth = (self.settings.USERNAME, self.settings.PASSWORD)
self._client = None

@property
def client(self) -> AsyncClient:
"""Create a `httpx.AsyncClient` if it doesn't exist."""
if not self._client:
headers = self.settings.HEADERS.dict(by_alias=True)
self._client = AsyncClient(auth=self.auth, headers=headers)
return self._client

async def status(self) -> DataBackendStatus:
"""HTTP backend check for server status."""
status_url = urljoin(self.base_url, self.settings.STATUS_ENDPOINT)
try:
response = await self.client.get(status_url)
response.raise_for_status()
except RequestError:
self.logger.error("Unable to request the server")
return DataBackendStatus.AWAY
except HTTPStatusError:
self.logger.error("Response raised an HTTP status of 4xx or 5xx")
return DataBackendStatus.ERROR

return DataBackendStatus.OK

async def read( # noqa: PLR0913
self,
query: Optional[Union[str, LRSStatementsQuery]] = None,
target: Optional[str] = None,
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
prefetch: Optional[PositiveInt] = None,
max_statements: Optional[PositiveInt] = None,
) -> Union[AsyncIterator[bytes], AsyncIterator[dict]]:
"""Get statements from LRS `target` endpoint.

The `read` method defined in the LRS specification returns `statements` array
and `more` IRL. The latter is required when the returned `statement` list has
been limited.

Args:
query (str, LRSStatementsQuery): The query to select records to read.
target (str): Endpoint from which to read data (e.g. `/statements`).
If target is `None`, `/xAPI/statements` default endpoint is used.
chunk_size (int or None): The number of records or bytes to read in one
batch, depending on whether the records are dictionaries or bytes.
raw_output (bool): Controls whether to yield bytes or dictionaries.
If the records are dictionaries and `raw_output` is set to `True`, they
are encoded as JSON.
If the records are bytes and `raw_output` is set to `False`, they are
decoded as JSON by line.
ignore_errors (bool): If `True`, errors during the read operation
are ignored and logged. If `False` (default), a `BackendException`
is raised if an error occurs.
prefetch (int): The number of records to prefetch (queue) while yielding.
If `prefetch` is `None` it defaults to `1` - no records are prefetched.
max_statements: The maximum number of statements to yield.
"""
statements = super().read(
query,
target,
chunk_size,
raw_output,
ignore_errors,
prefetch,
max_statements,
)
async for statement in statements:
yield statement

async def _read_bytes(
self,
query: LRSStatementsQuery,
target: Optional[str],
chunk_size: int,
ignore_errors: bool,
) -> AsyncIterator[bytes]:
"""Method called by `self.read` yielding bytes. See `self.read`."""
statements = self._read_dicts(query, target, chunk_size, ignore_errors)
async for statement in async_parse_dict_to_bytes(
statements, self.settings.LOCALE_ENCODING, ignore_errors, self.logger
):
yield statement

async def _read_dicts(
self,
query: LRSStatementsQuery,
target: Optional[str],
chunk_size: int,
ignore_errors: bool, # noqa: ARG002
) -> AsyncIterator[dict]:
"""Method called by `self.read` yielding dictionaries. See `self.read`."""
if not target:
target = self.settings.STATEMENTS_ENDPOINT

if query.limit:
self.logger.warning(
"The limit query parameter value is overwritten by the chunk_size "
"parameter value."
)

query.limit = chunk_size

# Create request URL
target = ParseResult(
scheme=urlparse(self.base_url).scheme,
netloc=urlparse(self.base_url).netloc,
path=target,
query="",
params="",
fragment="",
).geturl()

statements = self._fetch_statements(
target=target,
query_params=query.dict(exclude_none=True, exclude_unset=True),
)

# Iterate through results
try:
async for statement in statements:
yield statement
except HTTPError as error:
msg = "Failed to fetch statements: %s"
self.logger.error(msg, error)
raise BackendException(msg % (error,)) from error

async def write( # noqa: PLR0913
self,
data: Union[IOBase, Iterable[bytes], Iterable[dict]],
target: Optional[str] = None,
chunk_size: Optional[int] = None,
ignore_errors: bool = False,
operation_type: Optional[BaseOperationType] = None,
concurrency: Optional[int] = None,
) -> int:
"""Write `data` records to the `target` endpoint and return their count.

Args:
data: (Iterable): The data to write.
target (str or None): Endpoint in which to write data (e.g. `/statements`).
If `target` is `None`, `/xAPI/statements` default endpoint is used.
chunk_size (int or None): The number of records or bytes to write in one
batch, depending on whether `data` contains dictionaries or bytes.
If `chunk_size` is `None`, a default value is used instead.
ignore_errors (bool): If `True`, errors during the write operation
are ignored and logged. If `False` (default), a `BackendException`
is raised if an error occurs.
operation_type (BaseOperationType or None): The mode of the write operation.
If `operation_type` is `None`, the `default_operation_type` is used
instead. See `BaseOperationType`.
concurrency (int): The number of chunks to write concurrently.
If `None` it defaults to `1`.
"""
return await super().write(
data, target, chunk_size, ignore_errors, operation_type, concurrency
)

async def _write_bytes( # noqa: PLR0913
self,
data: Iterable[bytes],
target: Optional[str],
chunk_size: int,
ignore_errors: bool,
operation_type: BaseOperationType,
) -> int:
"""Method called by `self.write` writing bytes. See `self.write`."""
statements = parse_iterable_to_dict(data, ignore_errors, self.logger)
return await self._write_dicts(
statements, target, chunk_size, ignore_errors, operation_type
)

async def _write_dicts( # noqa: PLR0913
self,
data: Iterable[dict],
target: Optional[str],
chunk_size: int,
ignore_errors: bool,
operation_type: BaseOperationType, # noqa: ARG002
) -> int:
"""Method called by `self.write` writing dictionaries. See `self.write`."""
if not target:
target = self.settings.STATEMENTS_ENDPOINT

target = ParseResult(
scheme=urlparse(self.base_url).scheme,
netloc=urlparse(self.base_url).netloc,
path=target,
query="",
params="",
fragment="",
).geturl()

self.logger.debug(
"Start writing to the %s endpoint (chunk size: %s)", target, chunk_size
)

count = 0
for chunk in iter_by_batch(data, chunk_size):
count += await self._post_and_raise_for_status(target, chunk, ignore_errors)

self.logger.debug("Posted %d statements", count)
return count

async def close(self) -> None:
"""Close the `httpx.AsyncClient`.

Raise:
BackendException: If a failure occurs during the close operation.
"""
if not self._client:
self.logger.warning("No backend client to close.")
return

await self.client.aclose()

async def _fetch_statements(self, target, query_params: dict):
"""Fetch statements from a LRS."""
while True:
response = await self.client.get(target, params=query_params)
response.raise_for_status()
statements_response = StatementResponse(**response.json())
statements = statements_response.statements
if isinstance(statements, dict):
statements = [statements]

for statement in statements:
yield statement

if not statements_response.more:
break

query_params.update(parse_qs(urlparse(statements_response.more).query))

async def _post_and_raise_for_status(self, target, chunk, ignore_errors):
"""POST chunk of statements to `target` and return the number of insertions."""
try:
request = await self.client.post(target, json=chunk)
request.raise_for_status()
return len(chunk)
except HTTPError as error:
msg = "Failed to post statements: %s"
if ignore_errors:
self.logger.warning(msg, error)
return 0
self.logger.error(msg, error)
raise BackendException(msg % (error,)) from error
5 changes: 3 additions & 2 deletions src/ralph/backends/data/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ def _write_bytes( # noqa: PLR0913
msg = "Target file not specified; using random file name: %s"
self.logger.info(msg, target)

target = Path(target)
path = target if target.is_absolute() else self.default_directory / target
path = Path(target)
path = path if path.is_absolute() else self.default_directory / path

if operation_type in [BaseOperationType.CREATE, BaseOperationType.INDEX]:
if path.is_file():
Expand Down Expand Up @@ -363,6 +363,7 @@ def _write_bytes( # noqa: PLR0913
"timestamp": now(),
}
)
self.logger.debug("Written %s with success", path.absolute())
return 1

def close(self) -> None:
Expand Down
Loading