From 81270a6c5b53b871824480c698c973eff1f5523f Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Mon, 18 Sep 2023 08:14:25 +0200 Subject: [PATCH] Support managing index templates with DiracX --- src/diracx/db/__main__.py | 3 +- src/diracx/db/os/job_parameters.py | 28 ++++++------- src/diracx/db/os/utils.py | 12 +++++- tests/db/opensearch/conftest.py | 26 ++++++++---- tests/db/opensearch/test_index_template.py | 48 ++++++++++++++++++++++ 5 files changed, 91 insertions(+), 26 deletions(-) create mode 100644 tests/db/opensearch/test_index_template.py diff --git a/src/diracx/db/__main__.py b/src/diracx/db/__main__.py index abd7f3e0f..b79e0281b 100644 --- a/src/diracx/db/__main__.py +++ b/src/diracx/db/__main__.py @@ -46,8 +46,7 @@ async def init_os(): logger.info("Initialising %s", db_name) db = BaseOSDB.available_implementations(db_name)[0](db_url) async with db.client_context(): - # TODO: Implement - raise NotImplementedError("Create index templates and do any other setup") + await db.create_index_template() if __name__ == "__main__": diff --git a/src/diracx/db/os/job_parameters.py b/src/diracx/db/os/job_parameters.py index 0e1e888d2..7c38a83ef 100644 --- a/src/diracx/db/os/job_parameters.py +++ b/src/diracx/db/os/job_parameters.py @@ -4,21 +4,19 @@ class JobParametersDB(BaseOSDB): - mapping = { - "properties": { - "JobID": {"type": "long"}, - "timestamp": {"type": "date"}, - "CPUNormalizationFactor": {"type": "long"}, - "NormCPUTime(s)": {"type": "long"}, - "Memory(kB)": {"type": "long"}, - "TotalCPUTime(s)": {"type": "long"}, - "MemoryUsed(kb)": {"type": "long"}, - "HostName": {"type": "keyword"}, - "GridCE": {"type": "keyword"}, - "ModelName": {"type": "keyword"}, - "Status": {"type": "keyword"}, - "JobType": {"type": "keyword"}, - } + fields = { + "JobID": {"type": "long"}, + "timestamp": {"type": "date"}, + "CPUNormalizationFactor": {"type": "long"}, + "NormCPUTime(s)": {"type": "long"}, + "Memory(kB)": {"type": "long"}, + "TotalCPUTime(s)": {"type": "long"}, + "MemoryUsed(kb)": {"type": "long"}, + "HostName": {"type": "keyword"}, + "GridCE": {"type": "keyword"}, + "ModelName": {"type": "keyword"}, + "Status": {"type": "keyword"}, + "JobType": {"type": "keyword"}, } index_prefix = "mysetup_elasticjobparameters_index_" diff --git a/src/diracx/db/os/utils.py b/src/diracx/db/os/utils.py index e82bda355..4c7125f41 100644 --- a/src/diracx/db/os/utils.py +++ b/src/diracx/db/os/utils.py @@ -26,7 +26,7 @@ class OpenSearchDBUnavailable(OpenSearchDBError): class BaseOSDB(metaclass=ABCMeta): # TODO: Make metadata an abstract property - mapping: dict + fields: dict index_prefix: str @abstractmethod @@ -105,6 +105,16 @@ async def __aexit__(self, exc_type, exc, tb): self._client = None return + async def create_index_template(self) -> None: + template_body = { + "template": {"mappings": {"properties": self.fields}}, + "index_patterns": [f"{self.index_prefix}*"], + } + result = await self.client.indices.put_index_template( + name=self.index_prefix, body=template_body + ) + assert result["acknowledged"] + async def upsert(self, doc_id, document) -> None: # TODO: Implement properly response = await self.client.update( diff --git a/tests/db/opensearch/conftest.py b/tests/db/opensearch/conftest.py index f5de2441a..e42570771 100644 --- a/tests/db/opensearch/conftest.py +++ b/tests/db/opensearch/conftest.py @@ -25,13 +25,12 @@ class DummyOSDB(BaseOSDB): test runs are independent of each other. """ - mapping = { - "properties": { - "DateField": {"type": "date"}, - "IntegerField": {"type": "long"}, - "KeywordField1": {"type": "keyword"}, - "KeywordField2": {"type": "keyword"}, - } + fields = { + "DateField": {"type": "date"}, + "IntegerField": {"type": "long"}, + "KeywordField1": {"type": "keyword"}, + "KeywordField2": {"type": "keyword"}, + "TextField": {"type": "text"}, } def __init__(self, *args, **kwargs): @@ -69,8 +68,19 @@ def opensearch_conn_kwargs(demo_kubectl_env): @pytest.fixture -async def dummy_opensearch_db(opensearch_conn_kwargs): +async def dummy_opensearch_db_without_template(opensearch_conn_kwargs): """Fixture which returns a DummyOSDB object.""" db = DummyOSDB(opensearch_conn_kwargs) async with db.client_context(): yield db + # Clean up after the test + await db.client.indices.delete(index=f"{db.index_prefix}*") + + +@pytest.fixture +async def dummy_opensearch_db(dummy_opensearch_db_without_template): + """Fixture which returns a DummyOSDB object with the index template applied.""" + db = dummy_opensearch_db_without_template + await db.create_index_template() + yield db + await db.client.indices.delete_index_template(name=db.index_prefix) diff --git a/tests/db/opensearch/test_index_template.py b/tests/db/opensearch/test_index_template.py new file mode 100644 index 000000000..c01a516cc --- /dev/null +++ b/tests/db/opensearch/test_index_template.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from datetime import datetime, timezone + +import opensearchpy +import pytest + +from .conftest import DummyOSDB + +DUMMY_DOCUMENT = { + "DateField": datetime.now(tz=timezone.utc), + "IntegerField": 1234, + "KeywordField1": "keyword1", + "KeywordField2": "keyword two", + "TextField": "text value", +} + + +async def test_applies_new_indices(dummy_opensearch_db: DummyOSDB): + """Ensure that the index template is applied to new indices.""" + index_mappings = await _get_test_index_mappings(dummy_opensearch_db) + # Ensure the index template was applied during index creation + assert index_mappings == {"properties": dummy_opensearch_db.fields} + + +async def dummy_opensearch_db_without_template(dummy_opensearch_db: DummyOSDB): + """Sanity test that previous test fails if there isn't a template.""" + index_mappings = await _get_test_index_mappings(dummy_opensearch_db) + # Ensure the mappings are different to the expected ones + assert index_mappings != {"properties": dummy_opensearch_db.fields} + + +async def _get_test_index_mappings(dummy_opensearch_db: DummyOSDB): + document_id = 1 + index_name = dummy_opensearch_db.index_name(document_id) + + # At this point the index should not exist yet + with pytest.raises(opensearchpy.exceptions.NotFoundError): + await dummy_opensearch_db.client.indices.get_mapping(index_name) + + # Insert document which will automatically create the index based on the template + await dummy_opensearch_db.upsert(document_id, DUMMY_DOCUMENT) + + # Ensure the result looks as expected and return the mappings + index_mapping = await dummy_opensearch_db.client.indices.get_mapping(index_name) + assert list(index_mapping) == [index_name] + assert list(index_mapping[index_name]) == ["mappings"] + return index_mapping[index_name]["mappings"]