Skip to content

Commit

Permalink
[SDK] Fix Worker and Master templates for PyTorchJob
Browse files Browse the repository at this point in the history
  • Loading branch information
andreyvelich committed Jan 12, 2024
1 parent 521cbed commit de55a09
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 59 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test-python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ jobs:
run: |
pip install pytest python-dateutil urllib3 kubernetes
pip install -U './sdk/python[huggingface]'
- name: Run unit test for training sdk
run: pytest ./sdk/python/kubeflow/training/api/training_client_test.py
run: pytest ./sdk/python/kubeflow/training/api/training_client_test.py
43 changes: 23 additions & 20 deletions docs/proposals/train_api_proposal.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

**<h3>Authors:</h3>**

* Deepanker Gupta (**[@deepanker13](https://github.com/deepanker13)**), Nutanix
* Johnu George (**[@johnugeorge](https://github.com/johnugeorge)**), Nutanix
- Deepanker Gupta (**[@deepanker13](https://github.com/deepanker13)**), Nutanix
- Johnu George (**[@johnugeorge](https://github.com/johnugeorge)**), Nutanix

**<h3>Status</h3>**

* 10 Nov 2023 (v1)
* 30 Nov 2023 (v2)
- 10 Nov 2023 (v1)
- 30 Nov 2023 (v2)

**<h3>Goals</h3>**

1. To add a higher level train api for fine-tuning/training LLMs.

**<h3>Non Goals / Limitations</h3>**

1. The dataset is assumed to be preprocessed by the user.

2. Currently only pytorch framework will be supported for running distributed training.
Expand All @@ -30,7 +32,7 @@ LLMs are being widely used for generative AI tasks and as their adoption is incr

**<h3>Background</h3>**

Currently, there are two flows for data scientists to start using Training operator for their distributed training needs.
Currently, there are two flows for data scientists to start using Training operator for their distributed training needs.

**<h4>Traditional method</h4>**

Expand All @@ -48,7 +50,7 @@ To provide a better user experience, a new higher level SDK was added in[ https:
training_client.create_job(
name=pytorchjob_name,
train_func=train_function,
num_worker_replicas=3, # How many PyTorch Workers will be created.
num_workers=3, # How many PyTorch Workers will be created.
)
```

Expand All @@ -67,16 +69,16 @@ dataset_args = datasetProviderClass()
# Arguments related to the trainer code
parameters = {key:value pairs}
trainingClient.train(
num_workers=1,
num_workers=1,
num_procs_per_worker = 1,
resources_per_worker={"gpu": "2", "cpu":8, "memory": "16Gi"},
model_args,
dataset_args,
resources_per_worker={"gpu": "2", "cpu":8, "memory": "16Gi"},
model_args,
dataset_args,
parameters
)
```

Example:
Example:

```python
@dataclass
Expand All @@ -98,16 +100,16 @@ class HuggingFaceTrainParams:
transformerClass = field()

trainingClient.train(
num_workers=1,
num_workers=1,
num_procs_per_worker = 1,
resources_per_worker={"gpu": "2", "cpu":8, "memory": "16Gi"},
resources_per_worker={"gpu": "2", "cpu":8, "memory": "16Gi"},
HuggingFaceModelParams(model='hf://openchat/openchat_3.5', access_token = "hf_..." ),
S3DatasetParams(dataset= 's3://doc-example-bucket1/train_dataset', eval_dataset = "s3://doc-example-bucket1/eval_dataset", access_token = "s3 access token", region="us-west-2"),
S3DatasetParams(dataset= 's3://doc-example-bucket1/train_dataset', eval_dataset = "s3://doc-example-bucket1/eval_dataset", access_token = "s3 access token", region="us-west-2"),
HuggingFaceTrainParams(learning_rate=0.1, transformerClass="Trainer", peft_config = {})
)
```

The new proposed API takes following arguments
The new proposed API takes following arguments

1. System parameters - Number of workers, number of resources per workers(GPUs per worker).
2. Model parameters - Model provider and repository details.
Expand All @@ -116,7 +118,7 @@ The new proposed API takes following arguments

**<h3>Implementation</h3>**

1. Setup **init** **containers** that download the model and dataset to a PVC. Based on the specified model provider, corresponding training utility functions will be used. Eg: For Huggingface provider, Huggingface trainer can be used. For this **get_pytorchjob_template** function in the sdk needs to be changed to add init containers spec.. Inorder to download models and data sets, we need to support different providers like kaggle, hugging face, s3 or git lfs. The data can be stored in a shared volume between the init container and the main container.<br /> <br /> This way to download models allows using ReadWriteOnce and ReadOnlyMany PVCs. If we adopt the way of creating batch/v1 Job to download models to PVC, we need to force users to prepare ReadWriteOnce and ReadOnlyMany PVCs.<br /> <br /> A new folder containing the code for downloading model and dataset can be added to generate the images for init_containers. Abstract classes will be used as base to create when dataset download, model download and training loop is written for base images. <br /> These parameters will be passed as container args or environment variables.
1. Setup **init** **containers** that download the model and dataset to a PVC. Based on the specified model provider, corresponding training utility functions will be used. Eg: For Huggingface provider, Huggingface trainer can be used. For this **get_pytorchjob_template** function in the sdk needs to be changed to add init containers spec.. Inorder to download models and data sets, we need to support different providers like kaggle, hugging face, s3 or git lfs. The data can be stored in a shared volume between the init container and the main container.<br /> <br /> This way to download models allows using ReadWriteOnce and ReadOnlyMany PVCs. If we adopt the way of creating batch/v1 Job to download models to PVC, we need to force users to prepare ReadWriteOnce and ReadOnlyMany PVCs.<br /> <br /> A new folder containing the code for downloading model and dataset can be added to generate the images for init_containers. Abstract classes will be used as base to create when dataset download, model download and training loop is written for base images. <br /> These parameters will be passed as container args or environment variables.

```
sdk/python
Expand All @@ -129,12 +131,13 @@ sdk/python
-> storage.py #this is the file which will be invoked from the dockerfile
-> Dockerfile
```

```python
# code present in abstract_model_provider.py
class modelProvider():
@abstractmethod
def load_config(self):
pass
pass

@abstractmethod
def download_model(self):
Expand All @@ -154,9 +157,9 @@ class HuggingFace(modelProvider):
training_client.create_job(name="pytorchjob_name",train_func=custom_training_function, num_of_nodes=1, gpus_per_node = 4)
```

3. We can provide the training function as a **custom_training_function** argument or inside the **base_image** argument of the **create_job** API directly. In case of Hugging Face models, we can use Hugging Face Transformer library’s Trainer class as the training function.
3. We can provide the training function as a **custom_training_function** argument or inside the **base_image** argument of the **create_job** API directly. In case of Hugging Face models, we can use Hugging Face Transformer library’s Trainer class as the training function.

4. The launch command of the training job needs to be changed to torchrun to take **nnodes** and **nproc_per_node** into effect inside **get_pod_template_spec** function in the training operator SDK.
4. The launch command of the training job needs to be changed to torchrun to take **nnodes** and **nproc_per_node** into effect inside **get_pod_template_spec** function in the training operator SDK.

```python
exec_script = textwrap.dedent(
Expand All @@ -169,4 +172,4 @@ exec_script = textwrap.dedent(
"torchrun", "$program_path/ephemeral_script.py\
""""
)
```
```
2 changes: 1 addition & 1 deletion examples/sdk/create-pytorchjob-from-func.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@
"training_client.create_job(\n",
" name=pytorchjob_name,\n",
" train_func=train_pytorch_model,\n",
" num_worker_replicas=3, # How many PyTorch Workers will be created.\n",
" num_workers=3, # How many PyTorch Workers will be created.\n",
")"
]
},
Expand Down
31 changes: 18 additions & 13 deletions sdk/python/kubeflow/training/api/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,18 @@ def __init__(

def train(
self,
name: str = None,
namespace: str = None,
name: str,
namespace: Optional[str] = None,
num_workers: int = 1,
num_procs_per_worker: int = 1,
storage_config: Dict[str, str] = {"size": "10Gi", "storage_class": None},
storage_config: Dict[str, Optional[str]] = {
"size": "10Gi",
"storage_class": None,
},
model_provider_parameters=None,
dataset_provider_parameters=None,
train_parameters=None,
resources_per_worker: Union[dict, client.V1ResourceRequirements, None] = None,
# Dict[Literal["gpu", "cpu", "memory"], any] = None,
):
"""
Higher level train api
Expand All @@ -116,8 +118,9 @@ def train(
import peft
import transformers
except ImportError:
print(
"train api dependencies not installed. Run pip install -U 'kubeflow-training[huggingface]' "
raise ImportError(
"Train API dependencies not installed. "
+ "Run: pip install -U 'kubeflow-training[huggingface]' "
)
from kubeflow.storage_initializer.s3 import S3DatasetParams
from kubeflow.storage_initializer.hugging_face import (
Expand Down Expand Up @@ -274,7 +277,7 @@ def train(
namespace=namespace,
master_pod_template_spec=master_pod_template_spec,
worker_pod_template_spec=worker_pod_template_spec,
num_worker_replicas=num_workers - 1,
num_workers=num_workers,
num_procs_per_worker=num_procs_per_worker,
)

Expand All @@ -289,7 +292,7 @@ def create_job(
base_image: Optional[str] = None,
train_func: Optional[Callable] = None,
parameters: Optional[Dict[str, Any]] = None,
num_worker_replicas: Optional[int] = None,
num_workers: Optional[int] = None,
num_chief_replicas: Optional[int] = None,
num_ps_replicas: Optional[int] = None,
packages_to_install: Optional[List[str]] = None,
Expand Down Expand Up @@ -320,7 +323,7 @@ def create_job(
argument to define input parameters for the function. If `train_func` is
set, Base Image must support `bash` CLI to execute the training script.
parameters: Dict of input parameters that training function might receive.
num_worker_replicas: Number of Worker replicas for the Job.
num_workers: Number of Worker replicas for the Job.
num_chief_replicas: Number of Chief replicas for the TFJob. Number
of Chief replicas can't be more than 1.
num_ps_replicas: Number of Parameter Server replicas for the TFJob.
Expand Down Expand Up @@ -382,20 +385,21 @@ def create_job(
name=name,
namespace=namespace,
pod_template_spec=pod_template_spec,
num_worker_replicas=num_worker_replicas,
num_workers=num_workers,
num_chief_replicas=num_chief_replicas,
num_ps_replicas=num_ps_replicas,
)
elif job_kind == constants.PYTORCHJOB_KIND:
elif job_kind == constants.PYTORCHJOB_KIND and num_workers:
job = utils.get_pytorchjob_template(
name=name,
namespace=namespace,
worker_pod_template_spec=pod_template_spec,
num_worker_replicas=num_worker_replicas,
num_workers=num_workers,
)
else:
raise ValueError(
f"Job kind {job_kind} can't be created using function or image"
f"Job kind {job_kind} can't be created using function or image. "
+ "Number of Workers must be set."
)

# Verify Job object type.
Expand Down Expand Up @@ -1052,6 +1056,7 @@ def get_job_logs(
timeout: Optional, Kubernetes API server timeout in seconds
to execute the request.
verbose: Whether to get Kubernetes events for Job and corresponding pods.
If you need to get events from all PyTorchJob's Pods, set `isMaster = False`.
Returns:
Dict[str, str]: A dictionary in which the keys are pod names and the
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/kubeflow/training/api/training_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def __init__(self, kind) -> None:
"namespace": "test",
"train_func": lambda: print("Test Training Function"),
"base_image": "docker.io/test-training",
"num_worker_replicas": "3",
"num_workers": 3,
"packages_to_install": ["boto3==1.34.14"],
"pip_index_url": "https://pypi.custom.com/simple",
},
Expand Down Expand Up @@ -203,5 +203,5 @@ def test_create_job(training_client, test_name, kwargs, expected_output):
training_client.create_job(**kwargs)
assert expected_output == "success"
except Exception as e:
assert type(e) == expected_output
assert type(e) is expected_output
print("test execution complete")
47 changes: 28 additions & 19 deletions sdk/python/kubeflow/training/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,14 @@ def get_tfjob_template(
name: str,
namespace: str,
pod_template_spec: models.V1PodTemplateSpec,
num_worker_replicas: Optional[int] = None,
num_workers: Optional[int] = None,
num_chief_replicas: Optional[int] = None,
num_ps_replicas: Optional[int] = None,
):
# Check if at least one replica is set.
# TODO (andreyvelich): Remove this check once we have CEL validation.
# Ref: https://github.com/kubeflow/training-operator/issues/1708
if (
num_worker_replicas is None
and num_chief_replicas is None
and num_ps_replicas is None
):
if num_workers is None and num_chief_replicas is None and num_ps_replicas is None:
raise ValueError("At least one replica for TFJob must be set")

# Create TFJob template.
Expand Down Expand Up @@ -293,11 +289,11 @@ def get_tfjob_template(
template=pod_template_spec,
)

if num_worker_replicas is not None:
if num_workers is not None:
tfjob.spec.tf_replica_specs[
constants.REPLICA_TYPE_WORKER
] = models.KubeflowOrgV1ReplicaSpec(
replicas=num_worker_replicas,
replicas=num_workers,
template=pod_template_spec,
)

Expand All @@ -307,17 +303,17 @@ def get_tfjob_template(
def get_pytorchjob_template(
name: str,
namespace: str,
master_pod_template_spec: models.V1PodTemplateSpec = None,
worker_pod_template_spec: models.V1PodTemplateSpec = None,
num_worker_replicas: Optional[int] = None,
num_procs_per_worker: Optional[int] = 0,
num_workers: int,
worker_pod_template_spec: Optional[models.V1PodTemplateSpec],
master_pod_template_spec: Optional[models.V1PodTemplateSpec] = None,
num_procs_per_worker: Optional[int] = None,
elastic_policy: Optional[models.KubeflowOrgV1ElasticPolicy] = None,
):
# Check if at least one replica is set.
# Check if at least one Worker is set.
# TODO (andreyvelich): Remove this check once we have CEL validation.
# Ref: https://github.com/kubeflow/training-operator/issues/1708
if num_worker_replicas is None and master_pod_template_spec is None:
raise ValueError("At least one replica for PyTorchJob must be set")
if num_workers is None or num_workers < 0:
raise ValueError("At least one Worker for PyTorchJob must be set")

# Create PyTorchJob template.
pytorchjob = models.KubeflowOrgV1PyTorchJob(
Expand All @@ -330,32 +326,45 @@ def get_pytorchjob_template(
),
)

if num_procs_per_worker > 0:
if num_procs_per_worker:
pytorchjob.spec.nproc_per_node = str(num_procs_per_worker)
if elastic_policy:
pytorchjob.spec.elastic_policy = elastic_policy

# Create Master replica if that is set.
if master_pod_template_spec:
pytorchjob.spec.pytorch_replica_specs[
constants.REPLICA_TYPE_MASTER
] = models.KubeflowOrgV1ReplicaSpec(
replicas=1,
template=master_pod_template_spec,
)
# If we don't define Master template, use the Worker template.
else:
pytorchjob.spec.pytorch_replica_specs[
constants.REPLICA_TYPE_MASTER
] = models.KubeflowOrgV1ReplicaSpec(
replicas=1,
template=worker_pod_template_spec,
)

if num_worker_replicas:
# Create Worker with num_workers - 1 replicas.
# TODO (andreyvelich): Investigate if we can run PyTorchJob without the Master
# Currently, if Master is not set, Training Operator controller
# doesn't set RANK and WORLD_SIZE for PyTorchJob
if num_workers > 1:
pytorchjob.spec.pytorch_replica_specs[
constants.REPLICA_TYPE_WORKER
] = models.KubeflowOrgV1ReplicaSpec(
replicas=num_worker_replicas,
replicas=num_workers - 1,
template=worker_pod_template_spec,
)

return pytorchjob


def get_pvc_spec(
pvc_name: str, namespace: str, storage_size: str, storage_class: str = None
pvc_name: str, namespace: str, storage_size: str, storage_class: Optional[str]
):
if pvc_name is None or namespace is None or storage_size is None:
raise ValueError("One of the arguments is None")
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/test/e2e/test_e2e_pytorchjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ def train_func():
print(f"Start training for Epoch {i}")
time.sleep(1)

num_workers = 1
num_workers = 3

TRAINING_CLIENT.create_job(
name=JOB_NAME,
namespace=job_namespace,
train_func=train_func,
num_worker_replicas=num_workers,
num_workers=num_workers,
)

logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
Expand Down

0 comments on commit de55a09

Please sign in to comment.