From bde097adecfb9317a38d6c20800fd14ac3cb9138 Mon Sep 17 00:00:00 2001 From: yelias Date: Thu, 18 Jul 2024 13:33:03 +0300 Subject: [PATCH] [SDK] Add more unit tests for TrainingClient APIs - get_job_pods Signed-off-by: yelias --- .../training/api/training_client_test.py | 135 +++++++++++++++--- .../kubeflow/training/constants/constants.py | 3 + sdk/python/kubeflow/training/utils/utils.py | 2 +- 3 files changed, 123 insertions(+), 17 deletions(-) diff --git a/sdk/python/kubeflow/training/api/training_client_test.py b/sdk/python/kubeflow/training/api/training_client_test.py index e4d76b2766..32cf403b64 100644 --- a/sdk/python/kubeflow/training/api/training_client_test.py +++ b/sdk/python/kubeflow/training/api/training_client_test.py @@ -17,6 +17,9 @@ from kubernetes.client import V1Container from kubernetes.client import V1ResourceRequirements +LIST_RESPONSE = [{"metadata": {"name": "Dummy V1PodList"}}] +TEST_NAME = "test" + def create_namespaced_custom_object_response(*args, **kwargs): if args[2] == "timeout": @@ -25,6 +28,19 @@ def create_namespaced_custom_object_response(*args, **kwargs): raise RuntimeError() +def list_namespaced_pod_response(*args, **kwargs): + class MockResponse: + def get(self, timeout): + # Simulate a response from the Kubernetes API, and pass timeout for verification + LIST_RESPONSE[0]["timeout"] = timeout + if args[0] == "timeout": + raise multiprocessing.TimeoutError() + if args[0] == "runtime": + raise Exception() + return Mock(items=LIST_RESPONSE) + return MockResponse() + + def generate_container() -> V1Container: return V1Container( name="pytorch", @@ -55,7 +71,7 @@ def generate_pytorchjob( def create_job(): - job_namespace = "test" + job_namespace = TEST_NAME container = generate_container() master = KubeflowOrgV1ReplicaSpec( replicas=1, @@ -87,10 +103,10 @@ def __init__(self, kind) -> None: self.kind = kind -test_data = [ +test_data_for_create_job = [ ( "invalid extra parameter", - {"job": create_job(), "namespace": "test", "base_image": "test_image"}, + {"job": create_job(), "namespace": TEST_NAME, "base_image": "test_image"}, ValueError, ), ("invalid job kind", {"job_kind": "invalid_job_kind"}, ValueError), @@ -149,14 +165,14 @@ def __init__(self, kind) -> None: ), ( "valid flow", - {"job": create_job(), "namespace": "test"}, + {"job": create_job(), "namespace": TEST_NAME}, "success", ), ( "valid flow to create job from func", { "name": "test-job", - "namespace": "test", + "namespace": TEST_NAME, "train_func": lambda: print("Test Training Function"), "base_image": "docker.io/test-training", "num_workers": 3, @@ -169,7 +185,7 @@ def __init__(self, kind) -> None: "valid flow to create job using image", { "name": "test-job", - "namespace": "test", + "namespace": TEST_NAME, "base_image": "docker.io/test-training", "num_workers": 2, }, @@ -177,24 +193,91 @@ def __init__(self, kind) -> None: ), ] +test_data_for_get_job_pods = [ + ( + "valid flow with default namespace and default timeout", + { + "name": TEST_NAME, + }, + f"{constants.JOB_NAME_LABEL}={TEST_NAME}", + LIST_RESPONSE + ), + ( + "invalid replica_type", + { + "name": TEST_NAME, + "replica_type": "invalid_replica_type" + }, + "Label not relevant", + ValueError + ), + ( + "invalid replica_type (uppercase)", + { + "name": TEST_NAME, + "replica_type": constants.REPLICA_TYPE_WORKER + }, + "Label not relevant", + ValueError + ), + ( + "valid flow with specific timeout, replica_index, replica_type and master role", + { + "name": TEST_NAME, + "namespace": "test_namespace", + "timeout": 60, + "is_master": True, + "replica_type": constants.REPLICA_TYPE_MASTER.lower(), + "replica_index": 0 + }, + f"{constants.JOB_NAME_LABEL}={TEST_NAME},{constants.JOB_ROLE_LABEL}={constants.JOB_ROLE_MASTER}" + f",{constants.REPLICA_TYPE_LABEL}={constants.REPLICA_TYPE_MASTER.lower()},{constants.REPLICA_INDEX_LABEL}=0", + LIST_RESPONSE + ), + ( + "invalid flow with TimeoutError", + { + "name": TEST_NAME, + "namespace": "timeout", + }, + "Label not relevant", + TimeoutError + ), + ( + "invalid flow with RuntimeError", + { + "name": TEST_NAME, + "namespace": "runtime", + }, + "Label not relevant", + RuntimeError + ) +] + @pytest.fixture def training_client(): with patch( - "kubernetes.client.CustomObjectsApi", - return_value=Mock( - create_namespaced_custom_object=Mock( - side_effect=create_namespaced_custom_object_response - ) - ), - ), patch("kubernetes.client.CoreV1Api", return_value=Mock()), patch( - "kubernetes.config.load_kube_config", return_value=Mock() - ): + "kubernetes.client.CustomObjectsApi", + return_value=Mock( + create_namespaced_custom_object=Mock( + side_effect=create_namespaced_custom_object_response + ) + ), + ), patch("kubernetes.client.CoreV1Api", + return_value=Mock( + list_namespaced_pod=Mock( + side_effect=list_namespaced_pod_response + ) + ) + ), patch("kubernetes.config.load_kube_config", + return_value=Mock() + ): client = TrainingClient(job_kind=constants.PYTORCHJOB_KIND) yield client -@pytest.mark.parametrize("test_name,kwargs,expected_output", test_data) +@pytest.mark.parametrize("test_name,kwargs,expected_output", test_data_for_create_job) def test_create_job(training_client, test_name, kwargs, expected_output): """ test create_job function of training client @@ -206,3 +289,23 @@ def test_create_job(training_client, test_name, kwargs, expected_output): except Exception as e: assert type(e) is expected_output print("test execution complete") + + +@pytest.mark.parametrize("test_name,kwargs,expected_label_selector,expected_output", test_data_for_get_job_pods,) +def test_get_job_pods(training_client, test_name, kwargs, expected_label_selector, expected_output): + """ + test get_job_pods function of training client + """ + print("Executing test:", test_name) + try: + out = training_client.get_job_pods(**kwargs) + # Verify that list_namespaced_pod called with specified arguments + training_client.core_api.list_namespaced_pod.assert_called_with( + kwargs.get("namespace", constants.DEFAULT_NAMESPACE), + label_selector=expected_label_selector, + async_req=True) + assert out[0].pop("timeout") == kwargs.get("timeout", constants.DEFAULT_TIMEOUT) + assert out == expected_output + except Exception as e: + assert type(e) is expected_output + print("test execution complete") \ No newline at end of file diff --git a/sdk/python/kubeflow/training/constants/constants.py b/sdk/python/kubeflow/training/constants/constants.py index 0513c3e31e..76cd1967d0 100644 --- a/sdk/python/kubeflow/training/constants/constants.py +++ b/sdk/python/kubeflow/training/constants/constants.py @@ -22,6 +22,9 @@ # The default PIP index URL to download Python packages. DEFAULT_PIP_INDEX_URL = "https://pypi.org/simple" +# The default namespace in case namespace not define explicitly +DEFAULT_NAMESPACE = "default" + # Annotation to disable Istio sidecar. ISTIO_SIDECAR_INJECTION = "sidecar.istio.io/inject" diff --git a/sdk/python/kubeflow/training/utils/utils.py b/sdk/python/kubeflow/training/utils/utils.py index 250252677f..04665951de 100644 --- a/sdk/python/kubeflow/training/utils/utils.py +++ b/sdk/python/kubeflow/training/utils/utils.py @@ -69,7 +69,7 @@ def is_running_in_k8s(): def get_default_target_namespace(): if not is_running_in_k8s(): - return "default" + return constants.DEFAULT_NAMESPACE with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f: return f.readline()