Skip to content

Commit

Permalink
[SDK] Add more unit tests for TrainingClient APIs - get_job_pods
Browse files Browse the repository at this point in the history
Signed-off-by: yelias <[email protected]>
  • Loading branch information
yelias committed Jul 18, 2024
1 parent 54b5804 commit bde097a
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 17 deletions.
135 changes: 119 additions & 16 deletions sdk/python/kubeflow/training/api/training_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
from kubernetes.client import V1Container
from kubernetes.client import V1ResourceRequirements

LIST_RESPONSE = [{"metadata": {"name": "Dummy V1PodList"}}]
TEST_NAME = "test"


def create_namespaced_custom_object_response(*args, **kwargs):
if args[2] == "timeout":
Expand All @@ -25,6 +28,19 @@ def create_namespaced_custom_object_response(*args, **kwargs):
raise RuntimeError()


def list_namespaced_pod_response(*args, **kwargs):
class MockResponse:
def get(self, timeout):
# Simulate a response from the Kubernetes API, and pass timeout for verification
LIST_RESPONSE[0]["timeout"] = timeout
if args[0] == "timeout":
raise multiprocessing.TimeoutError()
if args[0] == "runtime":
raise Exception()
return Mock(items=LIST_RESPONSE)
return MockResponse()


def generate_container() -> V1Container:
return V1Container(
name="pytorch",
Expand Down Expand Up @@ -55,7 +71,7 @@ def generate_pytorchjob(


def create_job():
job_namespace = "test"
job_namespace = TEST_NAME
container = generate_container()
master = KubeflowOrgV1ReplicaSpec(
replicas=1,
Expand Down Expand Up @@ -87,10 +103,10 @@ def __init__(self, kind) -> None:
self.kind = kind


test_data = [
test_data_for_create_job = [
(
"invalid extra parameter",
{"job": create_job(), "namespace": "test", "base_image": "test_image"},
{"job": create_job(), "namespace": TEST_NAME, "base_image": "test_image"},
ValueError,
),
("invalid job kind", {"job_kind": "invalid_job_kind"}, ValueError),
Expand Down Expand Up @@ -149,14 +165,14 @@ def __init__(self, kind) -> None:
),
(
"valid flow",
{"job": create_job(), "namespace": "test"},
{"job": create_job(), "namespace": TEST_NAME},
"success",
),
(
"valid flow to create job from func",
{
"name": "test-job",
"namespace": "test",
"namespace": TEST_NAME,
"train_func": lambda: print("Test Training Function"),
"base_image": "docker.io/test-training",
"num_workers": 3,
Expand All @@ -169,32 +185,99 @@ def __init__(self, kind) -> None:
"valid flow to create job using image",
{
"name": "test-job",
"namespace": "test",
"namespace": TEST_NAME,
"base_image": "docker.io/test-training",
"num_workers": 2,
},
"success",
),
]

test_data_for_get_job_pods = [
(
"valid flow with default namespace and default timeout",
{
"name": TEST_NAME,
},
f"{constants.JOB_NAME_LABEL}={TEST_NAME}",
LIST_RESPONSE
),
(
"invalid replica_type",
{
"name": TEST_NAME,
"replica_type": "invalid_replica_type"
},
"Label not relevant",
ValueError
),
(
"invalid replica_type (uppercase)",
{
"name": TEST_NAME,
"replica_type": constants.REPLICA_TYPE_WORKER
},
"Label not relevant",
ValueError
),
(
"valid flow with specific timeout, replica_index, replica_type and master role",
{
"name": TEST_NAME,
"namespace": "test_namespace",
"timeout": 60,
"is_master": True,
"replica_type": constants.REPLICA_TYPE_MASTER.lower(),
"replica_index": 0
},
f"{constants.JOB_NAME_LABEL}={TEST_NAME},{constants.JOB_ROLE_LABEL}={constants.JOB_ROLE_MASTER}"
f",{constants.REPLICA_TYPE_LABEL}={constants.REPLICA_TYPE_MASTER.lower()},{constants.REPLICA_INDEX_LABEL}=0",
LIST_RESPONSE
),
(
"invalid flow with TimeoutError",
{
"name": TEST_NAME,
"namespace": "timeout",
},
"Label not relevant",
TimeoutError
),
(
"invalid flow with RuntimeError",
{
"name": TEST_NAME,
"namespace": "runtime",
},
"Label not relevant",
RuntimeError
)
]


@pytest.fixture
def training_client():
with patch(
"kubernetes.client.CustomObjectsApi",
return_value=Mock(
create_namespaced_custom_object=Mock(
side_effect=create_namespaced_custom_object_response
)
),
), patch("kubernetes.client.CoreV1Api", return_value=Mock()), patch(
"kubernetes.config.load_kube_config", return_value=Mock()
):
"kubernetes.client.CustomObjectsApi",
return_value=Mock(
create_namespaced_custom_object=Mock(
side_effect=create_namespaced_custom_object_response
)
),
), patch("kubernetes.client.CoreV1Api",
return_value=Mock(
list_namespaced_pod=Mock(
side_effect=list_namespaced_pod_response
)
)
), patch("kubernetes.config.load_kube_config",
return_value=Mock()
):
client = TrainingClient(job_kind=constants.PYTORCHJOB_KIND)
yield client


@pytest.mark.parametrize("test_name,kwargs,expected_output", test_data)
@pytest.mark.parametrize("test_name,kwargs,expected_output", test_data_for_create_job)
def test_create_job(training_client, test_name, kwargs, expected_output):
"""
test create_job function of training client
Expand All @@ -206,3 +289,23 @@ def test_create_job(training_client, test_name, kwargs, expected_output):
except Exception as e:
assert type(e) is expected_output
print("test execution complete")


@pytest.mark.parametrize("test_name,kwargs,expected_label_selector,expected_output", test_data_for_get_job_pods,)
def test_get_job_pods(training_client, test_name, kwargs, expected_label_selector, expected_output):
"""
test get_job_pods function of training client
"""
print("Executing test:", test_name)
try:
out = training_client.get_job_pods(**kwargs)
# Verify that list_namespaced_pod called with specified arguments
training_client.core_api.list_namespaced_pod.assert_called_with(
kwargs.get("namespace", constants.DEFAULT_NAMESPACE),
label_selector=expected_label_selector,
async_req=True)
assert out[0].pop("timeout") == kwargs.get("timeout", constants.DEFAULT_TIMEOUT)
assert out == expected_output
except Exception as e:
assert type(e) is expected_output
print("test execution complete")
3 changes: 3 additions & 0 deletions sdk/python/kubeflow/training/constants/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
# The default PIP index URL to download Python packages.
DEFAULT_PIP_INDEX_URL = "https://pypi.org/simple"

# The default namespace in case namespace not define explicitly
DEFAULT_NAMESPACE = "default"

# Annotation to disable Istio sidecar.
ISTIO_SIDECAR_INJECTION = "sidecar.istio.io/inject"

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/kubeflow/training/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def is_running_in_k8s():

def get_default_target_namespace():
if not is_running_in_k8s():
return "default"
return constants.DEFAULT_NAMESPACE
with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f:
return f.readline()

Expand Down

0 comments on commit bde097a

Please sign in to comment.