diff --git a/tests/conftest.py b/tests/conftest.py index 85ac197df..a045794d7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,10 @@ from __future__ import annotations import contextlib +import os +import subprocess from datetime import datetime, timedelta +from pathlib import Path from uuid import uuid4 import pytest @@ -30,6 +33,12 @@ def pytest_addoption(parser): default=False, help="Regenerate the AutoREST client", ) + parser.addoption( + "--demo-dir", + type=Path, + default=None, + help="Path to a running diracx-demo directory", + ) def pytest_collection_modifyitems(config, items): @@ -210,3 +219,21 @@ def admin_user_client(test_client, test_auth_settings): test_client.headers["Authorization"] = f"Bearer {token}" test_client.dirac_token_payload = payload yield test_client + + +@pytest.fixture(scope="session") +def demo_kubectl_env(request): + demo_dir = request.config.getoption("--demo-dir") + if demo_dir is None: + pytest.skip("Requires a running instance of the DiracX demo") + kube_conf = demo_dir / ".demo" / "kube.conf" + if not kube_conf.exists(): + raise RuntimeError(f"Could not find {kube_conf}, is the demo running?") + env = { + **os.environ, + "KUBECONFIG": str(kube_conf), + "PATH": os.environ["PATH"] + ":" + str(demo_dir / ".demo"), + } + pods_result = subprocess.check_output(["kubectl", "get", "pods"], env=env) + assert pods_result + yield env diff --git a/tests/db/opensearch/__init__.py b/tests/db/opensearch/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/db/opensearch/conftest.py b/tests/db/opensearch/conftest.py new file mode 100644 index 000000000..f5de2441a --- /dev/null +++ b/tests/db/opensearch/conftest.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +import secrets +import socket +import subprocess + +import pytest + +from diracx.db.os.utils import BaseOSDB + +OPENSEARCH_PORT = 28000 + + +def require_port_availability(port: int) -> bool: + """Raise an exception if the given port is already in use.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + if s.connect_ex(("localhost", port)) == 0: + raise RuntimeError(f"This test requires port {port} to be available") + + +class DummyOSDB(BaseOSDB): + """Example DiracX OpenSearch database class for testing. + + A new random prefix is created each time the class is defined to ensure + test runs are independent of each other. + """ + + mapping = { + "properties": { + "DateField": {"type": "date"}, + "IntegerField": {"type": "long"}, + "KeywordField1": {"type": "keyword"}, + "KeywordField2": {"type": "keyword"}, + } + } + + def __init__(self, *args, **kwargs): + # Randomize the index prefix to ensure tests are independent + self.index_prefix = f"dummy_{secrets.token_hex(8)}" + super().__init__(*args, **kwargs) + + def index_name(self, doc_id: int) -> str: + return f"{self.index_prefix}-{doc_id // 1e6:.0f}m" + + +@pytest.fixture(scope="session") +def opensearch_conn_kwargs(demo_kubectl_env): + """Fixture which forwards a port from the diracx-demo and returns the connection kwargs.""" + require_port_availability(OPENSEARCH_PORT) + command = [ + "kubectl", + "port-forward", + "service/opensearch-cluster-master", + f"{OPENSEARCH_PORT}:9200", + ] + with subprocess.Popen( + command, stdout=subprocess.PIPE, universal_newlines=True, env=demo_kubectl_env + ) as proc: + for line in proc.stdout: + if line.startswith("Forwarding from"): + yield { + "hosts": f"admin:admin@localhost:{OPENSEARCH_PORT}", + "use_ssl": True, + "verify_certs": False, + } + proc.kill() + break + proc.wait() + + +@pytest.fixture +async def dummy_opensearch_db(opensearch_conn_kwargs): + """Fixture which returns a DummyOSDB object.""" + db = DummyOSDB(opensearch_conn_kwargs) + async with db.client_context(): + yield db diff --git a/tests/db/opensearch/test_connection.py b/tests/db/opensearch/test_connection.py new file mode 100644 index 000000000..48431a6c4 --- /dev/null +++ b/tests/db/opensearch/test_connection.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +import pytest + +from diracx.db.os.utils import OpenSearchDBUnavailable + +from .conftest import OPENSEARCH_PORT, DummyOSDB, require_port_availability + + +async def _ensure_db_unavailable(db: DummyOSDB): + """Helper function which raises an exception if we manage to connect to the DB.""" + # Normally we would use "async with db.client_context()" but here + # __aenter__ is used explicitly to ensure the exception is raised + # while entering the context manager + acm = db.client_context() + with pytest.raises(OpenSearchDBUnavailable): + await acm.__aenter__() + + +async def test_connection(dummy_opensearch_db: DummyOSDB): + """Ensure we can connect to the OpenSearch database.""" + assert await dummy_opensearch_db.client.ping() + + +async def test_connection_error_bad_port(opensearch_conn_kwargs): + """Check the connection behavior when the DB is unavailable. + + This failure mode is emulated by changing the port number. + """ + require_port_availability(28001) + assert f":{OPENSEARCH_PORT}" in opensearch_conn_kwargs["hosts"] + db = DummyOSDB( + { + **opensearch_conn_kwargs, + "hosts": opensearch_conn_kwargs["hosts"].replace( + f":{OPENSEARCH_PORT}", ":28001" + ), + } + ) + await _ensure_db_unavailable(db) + + +async def test_connection_error_ssl(opensearch_conn_kwargs): + """Check the connection behavior when there is an SSL error.""" + db = DummyOSDB({**opensearch_conn_kwargs, "use_ssl": False}) + await _ensure_db_unavailable(db) + + +async def test_connection_error_certs(opensearch_conn_kwargs): + """Check the connection behavior when there is an certificate verification error.""" + db = DummyOSDB({**opensearch_conn_kwargs, "verify_certs": True}) + await _ensure_db_unavailable(db) + + +async def test_connection_error_bad_username(opensearch_conn_kwargs): + """Check the connection behavior when the username is incorrect.""" + assert "admin:admin" in opensearch_conn_kwargs["hosts"] + db = DummyOSDB( + { + **opensearch_conn_kwargs, + "hosts": opensearch_conn_kwargs["hosts"].replace( + "admin:admin", "nobody:admin" + ), + } + ) + await _ensure_db_unavailable(db) + + +async def test_connection_error_bad_password(opensearch_conn_kwargs): + """Check the connection behavior when the password is incorrect.""" + assert "admin:admin" in opensearch_conn_kwargs["hosts"] + db = DummyOSDB( + { + **opensearch_conn_kwargs, + "hosts": opensearch_conn_kwargs["hosts"].replace( + "admin:admin", "admin:wrong" + ), + } + ) + await _ensure_db_unavailable(db)