Skip to content

Commit

Permalink
Add OpenSearch connection tests
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisburr committed Sep 18, 2023
1 parent 827b4ec commit 8369779
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 0 deletions.
27 changes: 27 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import annotations

import contextlib
import os
import subprocess
from datetime import datetime, timedelta
from pathlib import Path
from uuid import uuid4

import pytest
Expand Down Expand Up @@ -30,6 +33,12 @@ def pytest_addoption(parser):
default=False,
help="Regenerate the AutoREST client",
)
parser.addoption(
"--demo-dir",
type=Path,
default=None,
help="Path to a running diracx-demo directory",
)


def pytest_collection_modifyitems(config, items):
Expand Down Expand Up @@ -210,3 +219,21 @@ def admin_user_client(test_client, test_auth_settings):
test_client.headers["Authorization"] = f"Bearer {token}"
test_client.dirac_token_payload = payload
yield test_client


@pytest.fixture(scope="session")
def demo_kubectl_env(request):
demo_dir = request.config.getoption("--demo-dir")
if demo_dir is None:
pytest.skip("Requires a running instance of the DiracX demo")
kube_conf = demo_dir / ".demo" / "kube.conf"
if not kube_conf.exists():
raise RuntimeError(f"Could not find {kube_conf}, is the demo running?")
env = {
**os.environ,
"KUBECONFIG": str(kube_conf),
"PATH": os.environ["PATH"] + ":" + str(demo_dir / ".demo"),
}
pods_result = subprocess.check_output(["kubectl", "get", "pods"], env=env)
assert pods_result
yield env
Empty file added tests/db/opensearch/__init__.py
Empty file.
76 changes: 76 additions & 0 deletions tests/db/opensearch/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from __future__ import annotations

import secrets
import socket
import subprocess

import pytest

from diracx.db.os.utils import BaseOSDB

OPENSEARCH_PORT = 28000


def require_port_availability(port: int) -> bool:
"""Raise an exception if the given port is already in use."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
if s.connect_ex(("localhost", port)) == 0:
raise RuntimeError(f"This test requires port {port} to be available")


class DummyOSDB(BaseOSDB):
"""Example DiracX OpenSearch database class for testing.
A new random prefix is created each time the class is defined to ensure
test runs are independent of each other.
"""

mapping = {
"properties": {
"DateField": {"type": "date"},
"IntegerField": {"type": "long"},
"KeywordField1": {"type": "keyword"},
"KeywordField2": {"type": "keyword"},
}
}

def __init__(self, *args, **kwargs):
# Randomize the index prefix to ensure tests are independent
self.index_prefix = f"dummy_{secrets.token_hex(8)}"
super().__init__(*args, **kwargs)

def index_name(self, doc_id: int) -> str:
return f"{self.index_prefix}-{doc_id // 1e6:.0f}m"


@pytest.fixture(scope="session")
def opensearch_conn_kwargs(demo_kubectl_env):
"""Fixture which forwards a port from the diracx-demo and returns the connection kwargs."""
require_port_availability(OPENSEARCH_PORT)
command = [
"kubectl",
"port-forward",
"service/opensearch-cluster-master",
f"{OPENSEARCH_PORT}:9200",
]
with subprocess.Popen(
command, stdout=subprocess.PIPE, universal_newlines=True, env=demo_kubectl_env
) as proc:
for line in proc.stdout:
if line.startswith("Forwarding from"):
yield {
"hosts": f"admin:admin@localhost:{OPENSEARCH_PORT}",
"use_ssl": True,
"verify_certs": False,
}
proc.kill()
break
proc.wait()


@pytest.fixture
async def dummy_opensearch_db(opensearch_conn_kwargs):
"""Fixture which returns a DummyOSDB object."""
db = DummyOSDB(opensearch_conn_kwargs)
async with db.client_context():
yield db
80 changes: 80 additions & 0 deletions tests/db/opensearch/test_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from __future__ import annotations

import pytest

from diracx.db.os.utils import OpenSearchDBUnavailable

from .conftest import OPENSEARCH_PORT, DummyOSDB, require_port_availability


async def _ensure_db_unavailable(db: DummyOSDB):
"""Helper function which raises an exception if we manage to connect to the DB."""
# Normally we would use "async with db.client_context()" but here
# __aenter__ is used explicitly to ensure the exception is raised
# while entering the context manager
acm = db.client_context()
with pytest.raises(OpenSearchDBUnavailable):
await acm.__aenter__()


async def test_connection(dummy_opensearch_db: DummyOSDB):
"""Ensure we can connect to the OpenSearch database."""
assert await dummy_opensearch_db.client.ping()


async def test_connection_error_bad_port(opensearch_conn_kwargs):
"""Check the connection behavior when the DB is unavailable.
This failure mode is emulated by changing the port number.
"""
require_port_availability(28001)
assert f":{OPENSEARCH_PORT}" in opensearch_conn_kwargs["hosts"]
db = DummyOSDB(
{
**opensearch_conn_kwargs,
"hosts": opensearch_conn_kwargs["hosts"].replace(
f":{OPENSEARCH_PORT}", ":28001"
),
}
)
await _ensure_db_unavailable(db)


async def test_connection_error_ssl(opensearch_conn_kwargs):
"""Check the connection behavior when there is an SSL error."""
db = DummyOSDB({**opensearch_conn_kwargs, "use_ssl": False})
await _ensure_db_unavailable(db)


async def test_connection_error_certs(opensearch_conn_kwargs):
"""Check the connection behavior when there is an certificate verification error."""
db = DummyOSDB({**opensearch_conn_kwargs, "verify_certs": True})
await _ensure_db_unavailable(db)


async def test_connection_error_bad_username(opensearch_conn_kwargs):
"""Check the connection behavior when the username is incorrect."""
assert "admin:admin" in opensearch_conn_kwargs["hosts"]
db = DummyOSDB(
{
**opensearch_conn_kwargs,
"hosts": opensearch_conn_kwargs["hosts"].replace(
"admin:admin", "nobody:admin"
),
}
)
await _ensure_db_unavailable(db)


async def test_connection_error_bad_password(opensearch_conn_kwargs):
"""Check the connection behavior when the password is incorrect."""
assert "admin:admin" in opensearch_conn_kwargs["hosts"]
db = DummyOSDB(
{
**opensearch_conn_kwargs,
"hosts": opensearch_conn_kwargs["hosts"].replace(
"admin:admin", "admin:wrong"
),
}
)
await _ensure_db_unavailable(db)

0 comments on commit 8369779

Please sign in to comment.