Skip to content

Commit

Permalink
Add test to create PyTorchJob from func (#1979)
Browse files Browse the repository at this point in the history
* Add test to create PyTorchJob from func

* Use unique Job name for each test

* Fix job name in test

* Describe PyTorchJob when test fails

* Skip test for gang scheduling
  • Loading branch information
andreyvelich authored Jan 11, 2024
1 parent 778f555 commit 37c7fb2
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 12 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:
sudo rm -rf /usr/local/lib/android
sudo rm -rf /usr/local/share/powershell
sudo rm -rf /usr/share/swift
echo "Disk usage after cleanup:"
df -h
Expand Down Expand Up @@ -109,7 +109,7 @@ jobs:
- name: Run tests
run: |
pip install pytest
python3 -m pip install -e sdk/python; pytest sdk/python/test --log-cli-level=info --namespace=default
python3 -m pip install -e sdk/python; pytest -s sdk/python/test --log-cli-level=debug --namespace=default
env:
GANG_SCHEDULER_NAME: ${{ matrix.gang-scheduler-name }}

Expand Down
13 changes: 13 additions & 0 deletions sdk/python/kubeflow/training/api/training_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@ def __init__(self, kind) -> None:
{"job": create_job(), "namespace": "test"},
"success",
),
(
"valid flow to create job from func",
{
"name": "test-job",
"namespace": "test",
"train_func": lambda: print("Test Training Function"),
"base_image": "docker.io/test-training",
"num_worker_replicas": "3",
"packages_to_install": ["boto3==1.34.14"],
"pip_index_url": "https://pypi.custom.com/simple",
},
"success",
),
]


Expand Down
2 changes: 1 addition & 1 deletion sdk/python/test/e2e/test_e2e_mpijob.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from test.e2e.constants import GANG_SCHEDULERS, NONE_GANG_SCHEDULERS

logging.basicConfig(format="%(message)s")
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("kubeflow.training.api.training_client").setLevel(logging.DEBUG)

TRAINING_CLIENT = TrainingClient(job_kind=constants.MPIJOB_KIND)
JOB_NAME = "mpijob-mxnet-ci-test"
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/test/e2e/test_e2e_mxjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from test.e2e.constants import GANG_SCHEDULERS, NONE_GANG_SCHEDULERS

logging.basicConfig(format="%(message)s")
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("kubeflow.training.api.training_client").setLevel(logging.DEBUG)

TRAINING_CLIENT = TrainingClient(job_kind=constants.MXJOB_KIND)
JOB_NAME = "mxjob-mnist-ci-test"
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/test/e2e/test_e2e_paddlejob.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from test.e2e.constants import GANG_SCHEDULERS, NONE_GANG_SCHEDULERS

logging.basicConfig(format="%(message)s")
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("kubeflow.training.api.training_client").setLevel(logging.DEBUG)

TRAINING_CLIENT = TrainingClient(job_kind=constants.PADDLEJOB_KIND)
JOB_NAME = "paddlejob-cpu-ci-test"
Expand Down
62 changes: 57 additions & 5 deletions sdk/python/test/e2e/test_e2e_pytorchjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@
from test.e2e.constants import GANG_SCHEDULERS, NONE_GANG_SCHEDULERS

logging.basicConfig(format="%(message)s")
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("kubeflow.training.api.training_client").setLevel(logging.DEBUG)

TRAINING_CLIENT = TrainingClient(job_kind=constants.PYTORCHJOB_KIND)
JOB_NAME = "pytorchjob-mnist-ci-test"
CONTAINER_NAME = "pytorch"
GANG_SCHEDULER_NAME = os.getenv(TEST_GANG_SCHEDULER_NAME_ENV_KEY, "")

Expand All @@ -49,6 +48,7 @@
reason="For gang-scheduling",
)
def test_sdk_e2e_with_gang_scheduling(job_namespace):
JOB_NAME = "pytorchjob-gang-scheduling"
container = generate_container()

master = KubeflowOrgV1ReplicaSpec(
Expand Down Expand Up @@ -81,12 +81,17 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):

unschedulable_pytorchjob = generate_pytorchjob(
job_namespace,
JOB_NAME,
master,
worker,
KubeflowOrgV1SchedulingPolicy(min_available=10),
)
schedulable_pytorchjob = generate_pytorchjob(
job_namespace, master, worker, KubeflowOrgV1SchedulingPolicy(min_available=2)
job_namespace,
JOB_NAME,
master,
worker,
KubeflowOrgV1SchedulingPolicy(min_available=2),
)

TRAINING_CLIENT.create_job(job=unschedulable_pytorchjob, namespace=job_namespace)
Expand Down Expand Up @@ -120,6 +125,7 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
reason="For plain scheduling",
)
def test_sdk_e2e(job_namespace):
JOB_NAME = "pytorchjob-e2e"
container = generate_container()

master = KubeflowOrgV1ReplicaSpec(
Expand All @@ -144,7 +150,7 @@ def test_sdk_e2e(job_namespace):
),
)

pytorchjob = generate_pytorchjob(job_namespace, master, worker)
pytorchjob = generate_pytorchjob(job_namespace, JOB_NAME, master, worker)

TRAINING_CLIENT.create_job(job=pytorchjob, namespace=job_namespace)
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
Expand All @@ -161,16 +167,62 @@ def test_sdk_e2e(job_namespace):
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)


@pytest.mark.skipif(
GANG_SCHEDULER_NAME in GANG_SCHEDULERS,
reason="For plain scheduling",
)
def test_sdk_e2e_create_from_func(job_namespace):
JOB_NAME = "pytorchjob-from-func"

def train_func():
import time

for i in range(10):
print(f"Start training for Epoch {i}")
time.sleep(1)

num_workers = 1

TRAINING_CLIENT.create_job(
name=JOB_NAME,
namespace=job_namespace,
train_func=train_func,
num_worker_replicas=num_workers,
)

logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

try:
utils.verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
except Exception as e:
utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)
raise Exception(f"PyTorchJob create from function E2E fails. Exception: {e}")

# Verify that PyTorchJob has correct pods.
pod_names = TRAINING_CLIENT.get_job_pod_names(
name=JOB_NAME, namespace=job_namespace
)

if len(pod_names) != num_workers or f"{JOB_NAME}-worker-0" not in pod_names:
raise Exception(f"PyTorchJob has incorrect pods: {pod_names}")

utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)


def generate_pytorchjob(
job_namespace: str,
job_name: str,
master: KubeflowOrgV1ReplicaSpec,
worker: KubeflowOrgV1ReplicaSpec,
scheduling_policy: Optional[KubeflowOrgV1SchedulingPolicy] = None,
) -> KubeflowOrgV1PyTorchJob:
return KubeflowOrgV1PyTorchJob(
api_version=constants.API_VERSION,
kind=constants.PYTORCHJOB_KIND,
metadata=V1ObjectMeta(name=JOB_NAME, namespace=job_namespace),
metadata=V1ObjectMeta(name=job_name, namespace=job_namespace),
spec=KubeflowOrgV1PyTorchJobSpec(
run_policy=KubeflowOrgV1RunPolicy(
clean_pod_policy="None",
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/test/e2e/test_e2e_tfjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
from test.e2e.constants import GANG_SCHEDULERS, NONE_GANG_SCHEDULERS

logging.basicConfig(format="%(message)s")
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("kubeflow.training.api.training_client").setLevel(logging.DEBUG)


TRAINING_CLIENT = TrainingClient(job_kind=constants.TFJOB_KIND)
JOB_NAME = "tfjob-mnist-ci-test"
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/test/e2e/test_e2e_xgboostjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from test.e2e.constants import GANG_SCHEDULERS, NONE_GANG_SCHEDULERS

logging.basicConfig(format="%(message)s")
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("kubeflow.training.api.training_client").setLevel(logging.DEBUG)

TRAINING_CLIENT = TrainingClient(job_kind=constants.XGBOOSTJOB_KIND)
JOB_NAME = "xgboostjob-iris-ci-test"
Expand Down

0 comments on commit 37c7fb2

Please sign in to comment.