Skip to content

Commit

Permalink
Distribute datasets
Browse files Browse the repository at this point in the history
Fix access modes in storage config

Signed-off-by: Andrey Velichkevich <[email protected]>
  • Loading branch information
andreyvelich committed Mar 15, 2024
1 parent d5bd8b2 commit a1ec3e6
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 8 deletions.
23 changes: 22 additions & 1 deletion sdk/python/kubeflow/trainer/hf_llm_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import logging
from urllib.parse import urlparse
import json
import os

from datasets import load_from_disk
from datasets import load_from_disk, Dataset
from datasets.distributed import split_dataset_by_node
from peft import LoraConfig, get_peft_model
import transformers
from transformers import (
Expand Down Expand Up @@ -83,6 +85,25 @@ def load_and_preprocess_data(dataset_dir, transformer_type, tokenizer):
eval_data = None
logger.info("Evaluation dataset is not found")

# Distribute dataset across PyTorchJob workers.
RANK = int(os.environ["RANK"])
WORLD_SIZE = int(os.environ["WORLD_SIZE"])
logger.info(
f"Distributed dataset across PyTorchJob workers. WORLD_SIZE: {WORLD_SIZE}, RANK: {RANK}"
)
if isinstance(train_data, Dataset):
train_data = split_dataset_by_node(
train_data,
rank=RANK,
world_size=WORLD_SIZE,
)
if isinstance(eval_data, Dataset):
eval_data = split_dataset_by_node(
eval_data,
rank=RANK,
world_size=WORLD_SIZE,
)

return train_data, eval_data


Expand Down
5 changes: 2 additions & 3 deletions sdk/python/kubeflow/training/api/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ def train(
num_workers: int = 1,
num_procs_per_worker: int = 1,
storage_config: Dict[str, Optional[Union[str, List[str]]]] = {
"size": constants.STORAGE_INITIALIZER_DEFAULT_SIZE,
"size": constants.PVC_DEFAULT_SIZE,
"storage_class": None,
"access_modes": ["ReadWriteOnce", "ReadOnlyMany"],
"access_modes": constants.PVC_DEFAULT_ACCESS_MODES,
},
model_provider_parameters=None,
dataset_provider_parameters=None,
Expand Down Expand Up @@ -221,7 +221,6 @@ def train(
# create worker pod spec
worker_pod_template_spec = utils.get_pod_template_spec(
containers=[container_spec],
init_containers=[init_container_spec],
volumes=[constants.STORAGE_INITIALIZER_VOLUME],
)

Expand Down
7 changes: 5 additions & 2 deletions sdk/python/kubeflow/training/constants/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@

# Constants for Train API.
STORAGE_INITIALIZER = "storage-initializer"
# The default value for dataset and model storage.
STORAGE_INITIALIZER_DEFAULT_SIZE = "10Gi"
# The default value for dataset and model storage PVC.
PVC_DEFAULT_SIZE = "10Gi"
# The default value for PVC access modes.
PVC_DEFAULT_ACCESS_MODES = ["ReadWriteOnce", "ReadOnlyMany"]


# TODO (andreyvelich): We should add image tag for Storage Initializer and Trainer.
STORAGE_INITIALIZER_IMAGE = "docker.io/kubeflow/storage-initializer"
Expand Down
7 changes: 5 additions & 2 deletions sdk/python/kubeflow/training/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,14 +383,17 @@ def get_pvc_spec(
raise ValueError("One of the required storage config argument is None")

if "size" not in storage_config:
storage_config["size"] = constants.STORAGE_INITIALIZER_DEFAULT_SIZE
storage_config["size"] = constants.PVC_DEFAULT_SIZE

if "access_modes" not in storage_config:
storage_config["access_modes"] = constants.PVC_DEFAULT_ACCESS_MODES

pvc_spec = models.V1PersistentVolumeClaim(
api_version="v1",
kind="PersistentVolumeClaim",
metadata={"name": pvc_name, "namepsace": namespace},
spec=models.V1PersistentVolumeClaimSpec(
access_modes=storage_config,
access_modes=storage_config["access_modes"],
resources=models.V1ResourceRequirements(
requests={"storage": storage_config["size"]}
),
Expand Down

0 comments on commit a1ec3e6

Please sign in to comment.