diff --git a/.github/workflows/test-python.yaml b/.github/workflows/test-python.yaml index 28994f2d07..ef21699476 100644 --- a/.github/workflows/test-python.yaml +++ b/.github/workflows/test-python.yaml @@ -26,6 +26,6 @@ jobs: run: | pip install pytest python-dateutil urllib3 kubernetes pip install -U './sdk/python[huggingface]' - + - name: Run unit test for training sdk - run: pytest ./sdk/python/kubeflow/training/api/training_client_test.py \ No newline at end of file + run: pytest ./sdk/python/kubeflow/training/api/training_client_test.py diff --git a/docs/proposals/train_api_proposal.md b/docs/proposals/train_api_proposal.md index c48878d7ae..a2b8b22a63 100644 --- a/docs/proposals/train_api_proposal.md +++ b/docs/proposals/train_api_proposal.md @@ -2,18 +2,20 @@ **

Authors:

** -* Deepanker Gupta (**[@deepanker13](https://github.com/deepanker13)**), Nutanix -* Johnu George (**[@johnugeorge](https://github.com/johnugeorge)**), Nutanix +- Deepanker Gupta (**[@deepanker13](https://github.com/deepanker13)**), Nutanix +- Johnu George (**[@johnugeorge](https://github.com/johnugeorge)**), Nutanix **

Status

** -* 10 Nov 2023 (v1) -* 30 Nov 2023 (v2) +- 10 Nov 2023 (v1) +- 30 Nov 2023 (v2) **

Goals

** + 1. To add a higher level train api for fine-tuning/training LLMs. **

Non Goals / Limitations

** + 1. The dataset is assumed to be preprocessed by the user. 2. Currently only pytorch framework will be supported for running distributed training. @@ -30,7 +32,7 @@ LLMs are being widely used for generative AI tasks and as their adoption is incr **

Background

** -Currently, there are two flows for data scientists to start using Training operator for their distributed training needs. +Currently, there are two flows for data scientists to start using Training operator for their distributed training needs. **

Traditional method

** @@ -48,7 +50,7 @@ To provide a better user experience, a new higher level SDK was added in[ https: training_client.create_job( name=pytorchjob_name, train_func=train_function, - num_worker_replicas=3, # How many PyTorch Workers will be created. + num_workers=3, # How many PyTorch Workers will be created. ) ``` @@ -67,16 +69,16 @@ dataset_args = datasetProviderClass() # Arguments related to the trainer code parameters = {key:value pairs} trainingClient.train( - num_workers=1, + num_workers=1, num_procs_per_worker = 1, - resources_per_worker={"gpu": "2", "cpu":8, "memory": "16Gi"}, - model_args, - dataset_args, + resources_per_worker={"gpu": "2", "cpu":8, "memory": "16Gi"}, + model_args, + dataset_args, parameters ) ``` -Example: +Example: ```python @dataclass @@ -98,16 +100,16 @@ class HuggingFaceTrainParams: transformerClass = field() trainingClient.train( - num_workers=1, + num_workers=1, num_procs_per_worker = 1, - resources_per_worker={"gpu": "2", "cpu":8, "memory": "16Gi"}, + resources_per_worker={"gpu": "2", "cpu":8, "memory": "16Gi"}, HuggingFaceModelParams(model='hf://openchat/openchat_3.5', access_token = "hf_..." ), - S3DatasetParams(dataset= 's3://doc-example-bucket1/train_dataset', eval_dataset = "s3://doc-example-bucket1/eval_dataset", access_token = "s3 access token", region="us-west-2"), + S3DatasetParams(dataset= 's3://doc-example-bucket1/train_dataset', eval_dataset = "s3://doc-example-bucket1/eval_dataset", access_token = "s3 access token", region="us-west-2"), HuggingFaceTrainParams(learning_rate=0.1, transformerClass="Trainer", peft_config = {}) ) ``` -The new proposed API takes following arguments +The new proposed API takes following arguments 1. System parameters - Number of workers, number of resources per workers(GPUs per worker). 2. Model parameters - Model provider and repository details. @@ -116,7 +118,7 @@ The new proposed API takes following arguments **

Implementation

** -1. Setup **init** **containers** that download the model and dataset to a PVC. Based on the specified model provider, corresponding training utility functions will be used. Eg: For Huggingface provider, Huggingface trainer can be used. For this **get_pytorchjob_template** function in the sdk needs to be changed to add init containers spec.. Inorder to download models and data sets, we need to support different providers like kaggle, hugging face, s3 or git lfs. The data can be stored in a shared volume between the init container and the main container.

This way to download models allows using ReadWriteOnce and ReadOnlyMany PVCs. If we adopt the way of creating batch/v1 Job to download models to PVC, we need to force users to prepare ReadWriteOnce and ReadOnlyMany PVCs.

A new folder containing the code for downloading model and dataset can be added to generate the images for init_containers. Abstract classes will be used as base to create when dataset download, model download and training loop is written for base images.
These parameters will be passed as container args or environment variables. +1. Setup **init** **containers** that download the model and dataset to a PVC. Based on the specified model provider, corresponding training utility functions will be used. Eg: For Huggingface provider, Huggingface trainer can be used. For this **get_pytorchjob_template** function in the sdk needs to be changed to add init containers spec.. Inorder to download models and data sets, we need to support different providers like kaggle, hugging face, s3 or git lfs. The data can be stored in a shared volume between the init container and the main container.

This way to download models allows using ReadWriteOnce and ReadOnlyMany PVCs. If we adopt the way of creating batch/v1 Job to download models to PVC, we need to force users to prepare ReadWriteOnce and ReadOnlyMany PVCs.

A new folder containing the code for downloading model and dataset can be added to generate the images for init_containers. Abstract classes will be used as base to create when dataset download, model download and training loop is written for base images.
These parameters will be passed as container args or environment variables. ``` sdk/python @@ -129,12 +131,13 @@ sdk/python -> storage.py #this is the file which will be invoked from the dockerfile -> Dockerfile ``` + ```python # code present in abstract_model_provider.py class modelProvider(): @abstractmethod def load_config(self): - pass + pass @abstractmethod def download_model(self): @@ -154,9 +157,9 @@ class HuggingFace(modelProvider): training_client.create_job(name="pytorchjob_name",train_func=custom_training_function, num_of_nodes=1, gpus_per_node = 4) ``` -3. We can provide the training function as a **custom_training_function** argument or inside the **base_image** argument of the **create_job** API directly. In case of Hugging Face models, we can use Hugging Face Transformer library’s Trainer class as the training function. +3. We can provide the training function as a **custom_training_function** argument or inside the **base_image** argument of the **create_job** API directly. In case of Hugging Face models, we can use Hugging Face Transformer library’s Trainer class as the training function. -4. The launch command of the training job needs to be changed to torchrun to take **nnodes** and **nproc_per_node** into effect inside **get_pod_template_spec** function in the training operator SDK. +4. The launch command of the training job needs to be changed to torchrun to take **nnodes** and **nproc_per_node** into effect inside **get_pod_template_spec** function in the training operator SDK. ```python exec_script = textwrap.dedent( @@ -169,4 +172,4 @@ exec_script = textwrap.dedent( "torchrun", "$program_path/ephemeral_script.py\ """" ) -``` \ No newline at end of file +``` diff --git a/examples/sdk/create-pytorchjob-from-func.ipynb b/examples/sdk/create-pytorchjob-from-func.ipynb index 5ab0cf21c2..08d45afa0e 100644 --- a/examples/sdk/create-pytorchjob-from-func.ipynb +++ b/examples/sdk/create-pytorchjob-from-func.ipynb @@ -364,7 +364,7 @@ "training_client.create_job(\n", " name=pytorchjob_name,\n", " train_func=train_pytorch_model,\n", - " num_worker_replicas=3, # How many PyTorch Workers will be created.\n", + " num_workers=3, # How many PyTorch Workers will be created.\n", ")" ] }, diff --git a/sdk/python/kubeflow/training/api/training_client.py b/sdk/python/kubeflow/training/api/training_client.py index 4204f1cd17..6ffd4fb898 100644 --- a/sdk/python/kubeflow/training/api/training_client.py +++ b/sdk/python/kubeflow/training/api/training_client.py @@ -95,16 +95,18 @@ def __init__( def train( self, - name: str = None, - namespace: str = None, + name: str, + namespace: Optional[str] = None, num_workers: int = 1, num_procs_per_worker: int = 1, - storage_config: Dict[str, str] = {"size": "10Gi", "storage_class": None}, + storage_config: Dict[str, Optional[str]] = { + "size": "10Gi", + "storage_class": None, + }, model_provider_parameters=None, dataset_provider_parameters=None, train_parameters=None, resources_per_worker: Union[dict, client.V1ResourceRequirements, None] = None, - # Dict[Literal["gpu", "cpu", "memory"], any] = None, ): """ Higher level train api @@ -116,8 +118,9 @@ def train( import peft import transformers except ImportError: - print( - "train api dependencies not installed. Run pip install -U 'kubeflow-training[huggingface]' " + raise ImportError( + "Train API dependencies not installed. " + + "Run: pip install -U 'kubeflow-training[huggingface]' " ) from kubeflow.storage_initializer.s3 import S3DatasetParams from kubeflow.storage_initializer.hugging_face import ( @@ -274,7 +277,7 @@ def train( namespace=namespace, master_pod_template_spec=master_pod_template_spec, worker_pod_template_spec=worker_pod_template_spec, - num_worker_replicas=num_workers - 1, + num_workers=num_workers, num_procs_per_worker=num_procs_per_worker, ) @@ -289,7 +292,7 @@ def create_job( base_image: Optional[str] = None, train_func: Optional[Callable] = None, parameters: Optional[Dict[str, Any]] = None, - num_worker_replicas: Optional[int] = None, + num_workers: Optional[int] = None, num_chief_replicas: Optional[int] = None, num_ps_replicas: Optional[int] = None, packages_to_install: Optional[List[str]] = None, @@ -320,7 +323,7 @@ def create_job( argument to define input parameters for the function. If `train_func` is set, Base Image must support `bash` CLI to execute the training script. parameters: Dict of input parameters that training function might receive. - num_worker_replicas: Number of Worker replicas for the Job. + num_workers: Number of Worker replicas for the Job. num_chief_replicas: Number of Chief replicas for the TFJob. Number of Chief replicas can't be more than 1. num_ps_replicas: Number of Parameter Server replicas for the TFJob. @@ -382,20 +385,21 @@ def create_job( name=name, namespace=namespace, pod_template_spec=pod_template_spec, - num_worker_replicas=num_worker_replicas, + num_workers=num_workers, num_chief_replicas=num_chief_replicas, num_ps_replicas=num_ps_replicas, ) - elif job_kind == constants.PYTORCHJOB_KIND: + elif job_kind == constants.PYTORCHJOB_KIND and num_workers: job = utils.get_pytorchjob_template( name=name, namespace=namespace, worker_pod_template_spec=pod_template_spec, - num_worker_replicas=num_worker_replicas, + num_workers=num_workers, ) else: raise ValueError( - f"Job kind {job_kind} can't be created using function or image" + f"Job kind {job_kind} can't be created using function or image. " + + "Number of Workers must be set." ) # Verify Job object type. @@ -1052,6 +1056,7 @@ def get_job_logs( timeout: Optional, Kubernetes API server timeout in seconds to execute the request. verbose: Whether to get Kubernetes events for Job and corresponding pods. + If you need to get events from all PyTorchJob's Pods, set `isMaster = False`. Returns: Dict[str, str]: A dictionary in which the keys are pod names and the diff --git a/sdk/python/kubeflow/training/api/training_client_test.py b/sdk/python/kubeflow/training/api/training_client_test.py index 9bf0950734..5527fca9c4 100644 --- a/sdk/python/kubeflow/training/api/training_client_test.py +++ b/sdk/python/kubeflow/training/api/training_client_test.py @@ -168,7 +168,7 @@ def __init__(self, kind) -> None: "namespace": "test", "train_func": lambda: print("Test Training Function"), "base_image": "docker.io/test-training", - "num_worker_replicas": "3", + "num_workers": 3, "packages_to_install": ["boto3==1.34.14"], "pip_index_url": "https://pypi.custom.com/simple", }, @@ -203,5 +203,5 @@ def test_create_job(training_client, test_name, kwargs, expected_output): training_client.create_job(**kwargs) assert expected_output == "success" except Exception as e: - assert type(e) == expected_output + assert type(e) is expected_output print("test execution complete") diff --git a/sdk/python/kubeflow/training/utils/utils.py b/sdk/python/kubeflow/training/utils/utils.py index 559dda4743..85c0d69ec1 100644 --- a/sdk/python/kubeflow/training/utils/utils.py +++ b/sdk/python/kubeflow/training/utils/utils.py @@ -251,18 +251,14 @@ def get_tfjob_template( name: str, namespace: str, pod_template_spec: models.V1PodTemplateSpec, - num_worker_replicas: Optional[int] = None, + num_workers: Optional[int] = None, num_chief_replicas: Optional[int] = None, num_ps_replicas: Optional[int] = None, ): # Check if at least one replica is set. # TODO (andreyvelich): Remove this check once we have CEL validation. # Ref: https://github.com/kubeflow/training-operator/issues/1708 - if ( - num_worker_replicas is None - and num_chief_replicas is None - and num_ps_replicas is None - ): + if num_workers is None and num_chief_replicas is None and num_ps_replicas is None: raise ValueError("At least one replica for TFJob must be set") # Create TFJob template. @@ -293,11 +289,11 @@ def get_tfjob_template( template=pod_template_spec, ) - if num_worker_replicas is not None: + if num_workers is not None: tfjob.spec.tf_replica_specs[ constants.REPLICA_TYPE_WORKER ] = models.KubeflowOrgV1ReplicaSpec( - replicas=num_worker_replicas, + replicas=num_workers, template=pod_template_spec, ) @@ -307,17 +303,17 @@ def get_tfjob_template( def get_pytorchjob_template( name: str, namespace: str, - master_pod_template_spec: models.V1PodTemplateSpec = None, - worker_pod_template_spec: models.V1PodTemplateSpec = None, - num_worker_replicas: Optional[int] = None, - num_procs_per_worker: Optional[int] = 0, + num_workers: int, + worker_pod_template_spec: Optional[models.V1PodTemplateSpec], + master_pod_template_spec: Optional[models.V1PodTemplateSpec] = None, + num_procs_per_worker: Optional[int] = None, elastic_policy: Optional[models.KubeflowOrgV1ElasticPolicy] = None, ): - # Check if at least one replica is set. + # Check if at least one Worker is set. # TODO (andreyvelich): Remove this check once we have CEL validation. # Ref: https://github.com/kubeflow/training-operator/issues/1708 - if num_worker_replicas is None and master_pod_template_spec is None: - raise ValueError("At least one replica for PyTorchJob must be set") + if num_workers is None or num_workers < 0: + raise ValueError("At least one Worker for PyTorchJob must be set") # Create PyTorchJob template. pytorchjob = models.KubeflowOrgV1PyTorchJob( @@ -330,11 +326,12 @@ def get_pytorchjob_template( ), ) - if num_procs_per_worker > 0: + if num_procs_per_worker: pytorchjob.spec.nproc_per_node = str(num_procs_per_worker) if elastic_policy: pytorchjob.spec.elastic_policy = elastic_policy + # Create Master replica if that is set. if master_pod_template_spec: pytorchjob.spec.pytorch_replica_specs[ constants.REPLICA_TYPE_MASTER @@ -342,12 +339,24 @@ def get_pytorchjob_template( replicas=1, template=master_pod_template_spec, ) + # If we don't define Master template, use the Worker template. + else: + pytorchjob.spec.pytorch_replica_specs[ + constants.REPLICA_TYPE_MASTER + ] = models.KubeflowOrgV1ReplicaSpec( + replicas=1, + template=worker_pod_template_spec, + ) - if num_worker_replicas: + # Create Worker with num_workers - 1 replicas. + # TODO (andreyvelich): Investigate if we can run PyTorchJob without the Master + # Currently, if Master is not set, Training Operator controller + # doesn't set RANK and WORLD_SIZE for PyTorchJob + if num_workers > 1: pytorchjob.spec.pytorch_replica_specs[ constants.REPLICA_TYPE_WORKER ] = models.KubeflowOrgV1ReplicaSpec( - replicas=num_worker_replicas, + replicas=num_workers - 1, template=worker_pod_template_spec, ) @@ -355,7 +364,7 @@ def get_pytorchjob_template( def get_pvc_spec( - pvc_name: str, namespace: str, storage_size: str, storage_class: str = None + pvc_name: str, namespace: str, storage_size: str, storage_class: Optional[str] ): if pvc_name is None or namespace is None or storage_size is None: raise ValueError("One of the arguments is None") diff --git a/sdk/python/test/e2e/test_e2e_pytorchjob.py b/sdk/python/test/e2e/test_e2e_pytorchjob.py index 82f64d5c04..92c6ae9764 100644 --- a/sdk/python/test/e2e/test_e2e_pytorchjob.py +++ b/sdk/python/test/e2e/test_e2e_pytorchjob.py @@ -181,13 +181,13 @@ def train_func(): print(f"Start training for Epoch {i}") time.sleep(1) - num_workers = 1 + num_workers = 3 TRAINING_CLIENT.create_job( name=JOB_NAME, namespace=job_namespace, train_func=train_func, - num_worker_replicas=num_workers, + num_workers=num_workers, ) logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")