Skip to content

Commit

Permalink
Fix Job kind in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
andreyvelich committed Sep 8, 2023
1 parent dbf3d44 commit 1fd7407
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 192 deletions.
20 changes: 10 additions & 10 deletions sdk/python/kubeflow/training/api/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ def create_job(
# Create the Training Job.
try:
self.custom_api.create_namespaced_custom_object(
constants.KUBEFLOW_GROUP,
constants.OPERATOR_VERSION,
constants.GROUP,
constants.VERSION,
namespace,
constants.JOB_PARAMETERS[job.kind]["plural"],
job,
Expand Down Expand Up @@ -252,8 +252,8 @@ def get_job(

try:
thread = self.custom_api.get_namespaced_custom_object(
constants.KUBEFLOW_GROUP,
constants.OPERATOR_VERSION,
constants.GROUP,
constants.VERSION,
namespace,
constants.JOB_PARAMETERS[job_kind]["plural"],
name,
Expand Down Expand Up @@ -307,8 +307,8 @@ def list_jobs(
result = []
try:
thread = self.custom_api.list_namespaced_custom_object(
constants.KUBEFLOW_GROUP,
constants.OPERATOR_VERSION,
constants.GROUP,
constants.VERSION,
namespace,
constants.JOB_PARAMETERS[job_kind]["plural"],
async_req=True,
Expand Down Expand Up @@ -889,8 +889,8 @@ def update_job(

try:
self.custom_api.patch_namespaced_custom_object(
constants.KUBEFLOW_GROUP,
constants.OPERATOR_VERSION,
constants.GROUP,
constants.VERSION,
namespace,
constants.JOB_PARAMETERS[job_kind]["plural"],
name,
Expand Down Expand Up @@ -931,8 +931,8 @@ def delete_job(

try:
self.custom_api.delete_namespaced_custom_object(
constants.KUBEFLOW_GROUP,
constants.OPERATOR_VERSION,
constants.GROUP,
constants.VERSION,
namespace,
constants.JOB_PARAMETERS[job_kind]["plural"],
name=name,
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/kubeflow/training/constants/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
DEFAULT_TIMEOUT = 120

# Common constants.
KUBEFLOW_GROUP = "kubeflow.org"
OPERATOR_VERSION = "v1"
API_VERSION = f"{KUBEFLOW_GROUP}/{OPERATOR_VERSION}"
GROUP = "kubeflow.org"
VERSION = "v1"
API_VERSION = f"{GROUP}/{VERSION}"
ISTIO_SIDECAR_INJECTION = "sidecar.istio.io/inject"

# Training Job conditions.
Expand Down
37 changes: 10 additions & 27 deletions sdk/python/test/e2e/test_e2e_mpijob.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
logging.basicConfig(format="%(message)s")
logging.getLogger().setLevel(logging.INFO)

TRAINING_CLIENT = TrainingClient()
TRAINING_CLIENT = TrainingClient(job_kind=constants.MPIJOB_KIND)
JOB_NAME = "mpijob-mxnet-ci-test"
CONTAINER_NAME = "mpi"
GANG_SCHEDULER_NAME = os.getenv(TEST_GANG_SCHEDULER_NAME_ENV_KEY, "")
Expand Down Expand Up @@ -91,29 +91,18 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
)

TRAINING_CLIENT.create_job(job=mpijob, namespace=job_namespace)
logging.info(f"List of created {constants.MPIJOB_KIND}s")
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_unschedulable_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
job_namespace,
constants.MPIJOB_KIND,
)
verify_unschedulable_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace)

TRAINING_CLIENT.update_job(patched_mpijob, JOB_NAME, job_namespace)
logging.info(f"List of patched {constants.MPIJOB_KIND}s")
logging.info(f"List of patched {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
job_namespace,
constants.MPIJOB_KIND,
timeout=900,
)
verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, timeout=900)

TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME)


@pytest.mark.skipif(
Expand Down Expand Up @@ -148,16 +137,10 @@ def test_sdk_e2e(job_namespace):
mpijob = generate_mpijob(job_namespace, launcher, worker)

TRAINING_CLIENT.create_job(job=mpijob, namespace=job_namespace)
logging.info(f"List of created {constants.MPIJOB_KIND}s")
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
job_namespace,
constants.MPIJOB_KIND,
timeout=900,
)
verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, timeout=900)

TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)

Expand All @@ -169,8 +152,8 @@ def generate_mpijob(
scheduling_policy: Optional[KubeflowOrgV1SchedulingPolicy] = None,
) -> KubeflowOrgV1MPIJob:
return KubeflowOrgV1MPIJob(
api_version="kubeflow.org/v1",
kind="MPIJob",
api_version=constants.API_VERSION,
kind=constants.MPIJOB_KIND,
metadata=V1ObjectMeta(name=JOB_NAME, namespace=job_namespace),
spec=KubeflowOrgV1MPIJobSpec(
slots_per_worker=1,
Expand Down
35 changes: 9 additions & 26 deletions sdk/python/test/e2e/test_e2e_mxjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
logging.basicConfig(format="%(message)s")
logging.getLogger().setLevel(logging.INFO)

TRAINING_CLIENT = TrainingClient()
TRAINING_CLIENT = TrainingClient(constants.MXJOB_KIND)
JOB_NAME = "mxjob-mnist-ci-test"
CONTAINER_NAME = "mxnet"
GANG_SCHEDULER_NAME = os.getenv(TEST_GANG_SCHEDULER_NAME_ENV_KEY, "")
Expand Down Expand Up @@ -114,27 +114,16 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
)

TRAINING_CLIENT.create_job(job=unschedulable_mxjob, namespace=job_namespace)
logging.info(f"List of created {constants.MXJOB_KIND}s")
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_unschedulable_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
job_namespace,
constants.MXJOB_KIND,
)
verify_unschedulable_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace)

TRAINING_CLIENT.update_job(schedulable_mxjob, JOB_NAME, job_namespace)
logging.info(f"List of patched {constants.MXJOB_KIND}s")
logging.info(f"List of patched {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
job_namespace,
constants.MXJOB_KIND,
timeout=900,
)
verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, timeout=900)

TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)

Expand Down Expand Up @@ -182,16 +171,10 @@ def test_sdk_e2e(job_namespace):
mxjob = generate_mxjob(job_namespace, scheduler, server, worker)

TRAINING_CLIENT.create_job(job=mxjob, namespace=job_namespace)
logging.info(f"List of created {constants.MXJOB_KIND}s")
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
job_namespace,
constants.MXJOB_KIND,
timeout=900,
)
verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, timeout=900)

TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)

Expand All @@ -204,8 +187,8 @@ def generate_mxjob(
scheduling_policy: Optional[KubeflowOrgV1SchedulingPolicy] = None,
) -> KubeflowOrgV1MXJob:
return KubeflowOrgV1MXJob(
api_version="kubeflow.org/v1",
kind="MXJob",
api_version=constants.API_VERSION,
kind=constants.MXJOB_KIND,
metadata=V1ObjectMeta(name=JOB_NAME, namespace=job_namespace),
spec=KubeflowOrgV1MXJobSpec(
job_mode="MXTrain",
Expand Down
35 changes: 9 additions & 26 deletions sdk/python/test/e2e/test_e2e_paddlejob.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
logging.basicConfig(format="%(message)s")
logging.getLogger().setLevel(logging.INFO)

TRAINING_CLIENT = TrainingClient()
TRAINING_CLIENT = TrainingClient(job_kind=constants.PADDLEJOB_KIND)
JOB_NAME = "paddlejob-cpu-ci-test"
CONTAINER_NAME = "paddle"
GANG_SCHEDULER_NAME = os.getenv(TEST_GANG_SCHEDULER_NAME_ENV_KEY, "")
Expand Down Expand Up @@ -77,27 +77,16 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
)

TRAINING_CLIENT.create_job(job=unschedulable_paddlejob, namespace=job_namespace)
logging.info(f"List of created {constants.PADDLEJOB_KIND}s")
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_unschedulable_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
job_namespace,
constants.PADDLEJOB_KIND,
)
verify_unschedulable_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace)

TRAINING_CLIENT.update_job(schedulable_paddlejob, JOB_NAME, job_namespace)
logging.info(f"List of patched {constants.PADDLEJOB_KIND}s")
logging.info(f"List of patched {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
job_namespace,
constants.PADDLEJOB_KIND,
timeout=900,
)
verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, timeout=900)

TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)

Expand All @@ -123,16 +112,10 @@ def test_sdk_e2e(job_namespace):
paddlejob = generate_paddlejob(job_namespace, worker)

TRAINING_CLIENT.create_job(job=paddlejob, namespace=job_namespace)
logging.info(f"List of created {constants.PADDLEJOB_KIND}s")
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
job_namespace,
constants.PADDLEJOB_KIND,
timeout=900,
)
verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, timeout=900)

TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)

Expand All @@ -143,8 +126,8 @@ def generate_paddlejob(
scheduling_policy: Optional[KubeflowOrgV1SchedulingPolicy] = None,
) -> KubeflowOrgV1PaddleJob:
return KubeflowOrgV1PaddleJob(
api_version="kubeflow.org/v1",
kind="PaddleJob",
api_version=constants.API_VERSION,
kind=constants.PADDLEJOB_KIND,
metadata=V1ObjectMeta(name=JOB_NAME, namespace=job_namespace),
spec=KubeflowOrgV1PaddleJobSpec(
run_policy=KubeflowOrgV1RunPolicy(
Expand Down
35 changes: 9 additions & 26 deletions sdk/python/test/e2e/test_e2e_pytorchjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
logging.basicConfig(format="%(message)s")
logging.getLogger().setLevel(logging.INFO)

TRAINING_CLIENT = TrainingClient()
TRAINING_CLIENT = TrainingClient(job_kind=constants.PYTORCHJOB_KIND)
JOB_NAME = "pytorchjob-mnist-ci-test"
CONTAINER_NAME = "pytorch"
GANG_SCHEDULER_NAME = os.getenv(TEST_GANG_SCHEDULER_NAME_ENV_KEY, "")
Expand Down Expand Up @@ -94,27 +94,16 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
)

TRAINING_CLIENT.create_job(job=unschedulable_pytorchjob, namespace=job_namespace)
logging.info(f"List of created {constants.PYTORCHJOB_KIND}s")
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_unschedulable_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
job_namespace,
constants.PYTORCHJOB_KIND,
)
verify_unschedulable_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace)

TRAINING_CLIENT.update_job(schedulable_pytorchjob, JOB_NAME, job_namespace)
logging.info(f"List of patched {constants.PYTORCHJOB_KIND}s")
logging.info(f"List of patched {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
job_namespace,
constants.PYTORCHJOB_KIND,
timeout=900,
)
verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, timeout=900)

TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)

Expand Down Expand Up @@ -151,16 +140,10 @@ def test_sdk_e2e(job_namespace):
pytorchjob = generate_pytorchjob(job_namespace, master, worker)

TRAINING_CLIENT.create_job(job=pytorchjob, namespace=job_namespace)
logging.info(f"List of created {constants.PYTORCHJOB_KIND}s")
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(
TRAINING_CLIENT,
JOB_NAME,
job_namespace,
constants.PYTORCHJOB_KIND,
timeout=900,
)
verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, timeout=900)

TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)

Expand All @@ -172,8 +155,8 @@ def generate_pytorchjob(
scheduling_policy: Optional[KubeflowOrgV1SchedulingPolicy] = None,
) -> KubeflowOrgV1PyTorchJob:
return KubeflowOrgV1PyTorchJob(
api_version="kubeflow.org/v1",
kind="PyTorchJob",
api_version=constants.API_VERSION,
kind=constants.PYTORCHJOB_KIND,
metadata=V1ObjectMeta(name=JOB_NAME, namespace=job_namespace),
spec=KubeflowOrgV1PyTorchJobSpec(
run_policy=KubeflowOrgV1RunPolicy(
Expand Down
Loading

0 comments on commit 1fd7407

Please sign in to comment.