From a1ec3e66baa1fc40fba95a7df7fab3267b62c667 Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Fri, 15 Mar 2024 16:56:52 +0000 Subject: [PATCH] Distribute datasets Fix access modes in storage config Signed-off-by: Andrey Velichkevich --- .../kubeflow/trainer/hf_llm_training.py | 23 ++++++++++++++++++- .../kubeflow/training/api/training_client.py | 5 ++-- .../kubeflow/training/constants/constants.py | 7 ++++-- sdk/python/kubeflow/training/utils/utils.py | 7 ++++-- 4 files changed, 34 insertions(+), 8 deletions(-) diff --git a/sdk/python/kubeflow/trainer/hf_llm_training.py b/sdk/python/kubeflow/trainer/hf_llm_training.py index d54eee9c34..26dd4fbe0e 100644 --- a/sdk/python/kubeflow/trainer/hf_llm_training.py +++ b/sdk/python/kubeflow/trainer/hf_llm_training.py @@ -2,8 +2,10 @@ import logging from urllib.parse import urlparse import json +import os -from datasets import load_from_disk +from datasets import load_from_disk, Dataset +from datasets.distributed import split_dataset_by_node from peft import LoraConfig, get_peft_model import transformers from transformers import ( @@ -83,6 +85,25 @@ def load_and_preprocess_data(dataset_dir, transformer_type, tokenizer): eval_data = None logger.info("Evaluation dataset is not found") + # Distribute dataset across PyTorchJob workers. + RANK = int(os.environ["RANK"]) + WORLD_SIZE = int(os.environ["WORLD_SIZE"]) + logger.info( + f"Distributed dataset across PyTorchJob workers. WORLD_SIZE: {WORLD_SIZE}, RANK: {RANK}" + ) + if isinstance(train_data, Dataset): + train_data = split_dataset_by_node( + train_data, + rank=RANK, + world_size=WORLD_SIZE, + ) + if isinstance(eval_data, Dataset): + eval_data = split_dataset_by_node( + eval_data, + rank=RANK, + world_size=WORLD_SIZE, + ) + return train_data, eval_data diff --git a/sdk/python/kubeflow/training/api/training_client.py b/sdk/python/kubeflow/training/api/training_client.py index b1040622d3..0dfe96772c 100644 --- a/sdk/python/kubeflow/training/api/training_client.py +++ b/sdk/python/kubeflow/training/api/training_client.py @@ -100,9 +100,9 @@ def train( num_workers: int = 1, num_procs_per_worker: int = 1, storage_config: Dict[str, Optional[Union[str, List[str]]]] = { - "size": constants.STORAGE_INITIALIZER_DEFAULT_SIZE, + "size": constants.PVC_DEFAULT_SIZE, "storage_class": None, - "access_modes": ["ReadWriteOnce", "ReadOnlyMany"], + "access_modes": constants.PVC_DEFAULT_ACCESS_MODES, }, model_provider_parameters=None, dataset_provider_parameters=None, @@ -221,7 +221,6 @@ def train( # create worker pod spec worker_pod_template_spec = utils.get_pod_template_spec( containers=[container_spec], - init_containers=[init_container_spec], volumes=[constants.STORAGE_INITIALIZER_VOLUME], ) diff --git a/sdk/python/kubeflow/training/constants/constants.py b/sdk/python/kubeflow/training/constants/constants.py index 470e5281c6..0513c3e31e 100644 --- a/sdk/python/kubeflow/training/constants/constants.py +++ b/sdk/python/kubeflow/training/constants/constants.py @@ -71,8 +71,11 @@ # Constants for Train API. STORAGE_INITIALIZER = "storage-initializer" -# The default value for dataset and model storage. -STORAGE_INITIALIZER_DEFAULT_SIZE = "10Gi" +# The default value for dataset and model storage PVC. +PVC_DEFAULT_SIZE = "10Gi" +# The default value for PVC access modes. +PVC_DEFAULT_ACCESS_MODES = ["ReadWriteOnce", "ReadOnlyMany"] + # TODO (andreyvelich): We should add image tag for Storage Initializer and Trainer. STORAGE_INITIALIZER_IMAGE = "docker.io/kubeflow/storage-initializer" diff --git a/sdk/python/kubeflow/training/utils/utils.py b/sdk/python/kubeflow/training/utils/utils.py index 0d9f076a6a..06f76b2164 100644 --- a/sdk/python/kubeflow/training/utils/utils.py +++ b/sdk/python/kubeflow/training/utils/utils.py @@ -383,14 +383,17 @@ def get_pvc_spec( raise ValueError("One of the required storage config argument is None") if "size" not in storage_config: - storage_config["size"] = constants.STORAGE_INITIALIZER_DEFAULT_SIZE + storage_config["size"] = constants.PVC_DEFAULT_SIZE + + if "access_modes" not in storage_config: + storage_config["access_modes"] = constants.PVC_DEFAULT_ACCESS_MODES pvc_spec = models.V1PersistentVolumeClaim( api_version="v1", kind="PersistentVolumeClaim", metadata={"name": pvc_name, "namepsace": namespace}, spec=models.V1PersistentVolumeClaimSpec( - access_modes=storage_config, + access_modes=storage_config["access_modes"], resources=models.V1ResourceRequirements( requests={"storage": storage_config["size"]} ),