From 4833aab60d66e5ebf345418026de42d8af27b4cf Mon Sep 17 00:00:00 2001 From: Michael Benayoun Date: Mon, 22 Apr 2024 11:57:38 +0200 Subject: [PATCH] Refacored test --- tests/conftest.py | 2 +- tests/distributed/test_training.py | 170 ----------------------------- tests/test_trainers.py | 139 +++++++++++++++++++++-- 3 files changed, 133 insertions(+), 178 deletions(-) delete mode 100644 tests/distributed/test_training.py diff --git a/tests/conftest.py b/tests/conftest.py index 13f166d06..beed61c29 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -145,7 +145,7 @@ def _hub_test(create_local_cache: bool = False): set_custom_cache_repo_name_in_hf_home(custom_cache_repo_with_seed) if create_local_cache: - yield tuple([custom_cache_repo_with_seed, local_cache_path_with_seed]) + yield (custom_cache_repo_with_seed, local_cache_path_with_seed) else: yield custom_cache_repo_with_seed diff --git a/tests/distributed/test_training.py b/tests/distributed/test_training.py deleted file mode 100644 index d9384dc96..000000000 --- a/tests/distributed/test_training.py +++ /dev/null @@ -1,170 +0,0 @@ -# coding=utf-8 -# Copyright 2023 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests related to training with `neuronx_distributed`.""" - -import json -from pathlib import Path - -import pytest -from datasets import load_dataset -from transformers import AutoConfig, AutoModelForSequenceClassification, AutoTokenizer - -from optimum.neuron.training_args import NeuronTrainingArguments -from optimum.neuron.utils.testing_utils import is_trainium_test - -from .distributed import DistributedTest - - -MODEL_NAME = "michaelbenayoun/llama-2-tiny-16layers-random" - - -@is_trainium_test -class TestDistributedTraining(DistributedTest): - CACHE_REPO_NAME = "optimum-internal-testing/optimum-neuron-cache-for-testing" - - @pytest.fixture( - scope="class", - # params=[[2, 1, 1], [2, 2, 1], [2, 1, 2]], - # ids=["dp=2", "tp=2", "pp=2"], - # TODO: fix pp=2 case since it is flaky and can hang. - params=[[2, 1, 1], [2, 2, 1]], - ids=["dp=2", "tp=2"], - ) - def parallel_sizes(self, request): - return request.param - - def test_save_and_resume_from_checkpoint(self, parallel_sizes, tmpdir): - from optimum.neuron.trainers import NeuronTrainer - - tmpdir = Path(tmpdir) - _, tp_size, pp_size = parallel_sizes - train_batch_size = 2 - eval_batch_size = 2 - max_steps = 10 - do_eval = True - max_train_samples = 100 - max_eval_samples = 16 - - tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) - tokenizer.pad_token = tokenizer.eos_token - - def create_training_args(output_dir, resume_from_checkpoint=None, max_steps=max_steps): - if isinstance(output_dir, Path): - output_dir = output_dir.as_posix() - if isinstance(resume_from_checkpoint, Path): - resume_from_checkpoint = resume_from_checkpoint.as_posix() - args = NeuronTrainingArguments( - tensor_parallel_size=tp_size, - pipeline_parallel_size=pp_size, - bf16=True, - per_device_train_batch_size=train_batch_size, - per_device_eval_batch_size=eval_batch_size, - max_steps=max_steps, - logging_steps=1, - save_steps=5, - do_eval=do_eval, - output_dir=output_dir, - resume_from_checkpoint=resume_from_checkpoint, - skip_cache_push=False, - ) - return args - - def create_model(): - config = AutoConfig.from_pretrained(MODEL_NAME) - config.num_hidden_layers = 2 * max(1, pp_size) - config.num_attention_heads = 2 - config.num_key_value_heads = 2 - config.problem_type = "single_label_classification" - # config.use_cache = False - model = AutoModelForSequenceClassification.from_pretrained( - MODEL_NAME, config=config, ignore_mismatched_sizes=True - ) - return model - - # First run setting. - first_output_dir = tmpdir / "first_run" - args = create_training_args(first_output_dir) - model = create_model() - - # Dataset preprocessing - raw_datasets = load_dataset("glue", "sst2") - sentence1_key = "sentence" - sentence2_key = None - label_to_id = None - max_seq_length = 32 - padding = "max_length" - - def preprocess_function(examples): - # Tokenize the texts - args = ( - (examples[sentence1_key],) - if sentence2_key is None - else (examples[sentence1_key], examples[sentence2_key]) - ) - result = tokenizer(*args, padding=padding, max_length=max_seq_length, truncation=True) - - # Map labels to IDs (not necessary for GLUE tasks) - if label_to_id is not None and "label" in examples: - result["label"] = [(label_to_id[l] if l != -1 else -1) for l in examples["label"]] - return result - - with args.main_process_first(desc="dataset map pre-processing"): - raw_datasets = raw_datasets.map(preprocess_function, batched=True) - train_dataset = raw_datasets["train"] - train_dataset = train_dataset.select(range(max_train_samples)) - eval_dataset = raw_datasets["validation"] - eval_dataset = eval_dataset.select(range(max_eval_samples)) - - trainer = NeuronTrainer( - model, args=args, train_dataset=train_dataset, eval_dataset=eval_dataset, tokenizer=tokenizer - ) - - train_result = trainer.train() - trainer.evaluate() - trainer.save_metrics("train", train_result.metrics) - - with open(first_output_dir / "train_results.json") as fp: - first_training_report = json.load(fp) - - # Case 1: Resuming from checkpoint by specifying a checkpoint directory. - second_output_dir = tmpdir / "second_run" - resume_from_checkpoint = first_output_dir / "checkpoint-5" - args = create_training_args(second_output_dir, resume_from_checkpoint=resume_from_checkpoint) - model = create_model() - trainer = NeuronTrainer( - model, args=args, train_dataset=train_dataset, eval_dataset=eval_dataset, tokenizer=tokenizer - ) - - train_result = trainer.train(resume_from_checkpoint=resume_from_checkpoint) - trainer.evaluate() - trainer.save_metrics("train", train_result.metrics) - - with open(first_output_dir / "train_results.json") as fp: - second_training_report = json.load(fp) - - assert first_training_report["train_loss"] == second_training_report["train_loss"] - - # Case 2: Resuming from checkpoint by specifying an output_dir with checkpoints. - # max_steps + 10 to do a some training steps than the previous run. - second_output_dir = first_output_dir - args = create_training_args(second_output_dir, max_steps=max_steps + 10) - model = create_model() - - trainer = NeuronTrainer( - model, args=args, train_dataset=train_dataset, eval_dataset=eval_dataset, tokenizer=tokenizer - ) - - trainer.train(resume_from_checkpoint=True) - trainer.evaluate() diff --git a/tests/test_trainers.py b/tests/test_trainers.py index ded2c05a8..176398759 100644 --- a/tests/test_trainers.py +++ b/tests/test_trainers.py @@ -15,13 +15,15 @@ """Tests related to the Trainer derived classes.""" import copy +import json import shutil import time from pathlib import Path import pytest +from datasets import load_dataset from huggingface_hub import HfApi -from transformers import AutoTokenizer, LlamaForCausalLM +from transformers import AutoConfig, AutoModelForSequenceClassification, AutoTokenizer, LlamaForCausalLM from optimum.neuron import NeuronTrainer, NeuronTrainingArguments from optimum.neuron.distributed.utils import MODEL_PARALLEL_SHARDS_DIR_NAME @@ -49,7 +51,7 @@ # LLAMA_V2_MODEL_NAME = "michaelbenayoun/llama-2-tiny-16layers-32kv-heads-random" -LLAMA_V2_MODEL_NAME = "michaelbenayoun/llama-2-tiny-4kv-heads-4layers-random" +MODEL_NAME = "michaelbenayoun/llama-2-tiny-4kv-heads-4layers-random" @is_trainium_test @@ -66,7 +68,7 @@ def test_get_model_param_count(self, parallel_sizes, tmpdir): _, tp_size, pp_size = parallel_sizes output_dir = Path(tmpdir) - model = get_model(LlamaForCausalLM, LLAMA_V2_MODEL_NAME, tp_size=tp_size, pp_size=pp_size) + model = get_model(LlamaForCausalLM, MODEL_NAME, tp_size=tp_size, pp_size=pp_size) target_num_parameters = sum(p.numel() for p in model.parameters()) @@ -100,8 +102,8 @@ def test_save_checkpoint(self, hub_test, tmpdir, parallel_sizes): tp_rank = get_tensor_model_parallel_rank() pp_rank = get_pipeline_model_parallel_rank() - tokenizer = AutoTokenizer.from_pretrained(LLAMA_V2_MODEL_NAME) - model = get_model(LlamaForCausalLM, LLAMA_V2_MODEL_NAME, tp_size=tp_size, pp_size=pp_size) + tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) + model = get_model(LlamaForCausalLM, MODEL_NAME, tp_size=tp_size, pp_size=pp_size) datasets = create_dummy_causal_lm_dataset(model.config.vocab_size, 120, 1) args = NeuronTrainingArguments( @@ -176,8 +178,8 @@ def test_train_and_eval_use_remote_cache(self, hub_test_with_local_cache, tmpdir num_eval_samples = 100 per_device_eval_batch_size = 16 - tokenizer = AutoTokenizer.from_pretrained(LLAMA_V2_MODEL_NAME) - model = get_model(LlamaForCausalLM, LLAMA_V2_MODEL_NAME, tp_size=tp_size, pp_size=pp_size) + tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) + model = get_model(LlamaForCausalLM, MODEL_NAME, tp_size=tp_size, pp_size=pp_size) clone = copy.deepcopy(model) datasets = create_dummy_causal_lm_dataset(model.config.vocab_size, num_train_samples, num_eval_samples) @@ -263,3 +265,126 @@ def test_train_and_eval_use_remote_cache(self, hub_test_with_local_cache, tmpdir assert ( second_training_duration < first_training_duration ), "Second training should be faster because cached graphs can be used." + + def test_save_and_resume_from_checkpoint(self, parallel_sizes, tmpdir): + + tmpdir = Path(tmpdir) + _, tp_size, pp_size = parallel_sizes + train_batch_size = 2 + eval_batch_size = 2 + max_steps = 10 + do_eval = True + max_train_samples = 100 + max_eval_samples = 16 + + tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) + tokenizer.pad_token = tokenizer.eos_token + + def create_training_args(output_dir, resume_from_checkpoint=None, max_steps=max_steps): + if isinstance(output_dir, Path): + output_dir = output_dir.as_posix() + if isinstance(resume_from_checkpoint, Path): + resume_from_checkpoint = resume_from_checkpoint.as_posix() + args = NeuronTrainingArguments( + tensor_parallel_size=tp_size, + pipeline_parallel_size=pp_size, + bf16=True, + per_device_train_batch_size=train_batch_size, + per_device_eval_batch_size=eval_batch_size, + max_steps=max_steps, + logging_steps=1, + save_steps=5, + do_eval=do_eval, + output_dir=output_dir, + resume_from_checkpoint=resume_from_checkpoint, + skip_cache_push=False, + ) + return args + + def create_model(): + config = AutoConfig.from_pretrained(MODEL_NAME) + config.num_hidden_layers = 2 * max(1, pp_size) + config.num_attention_heads = 2 + config.num_key_value_heads = 2 + config.problem_type = "single_label_classification" + # config.use_cache = False + model = AutoModelForSequenceClassification.from_pretrained( + MODEL_NAME, config=config, ignore_mismatched_sizes=True + ) + return model + + # First run setting. + first_output_dir = tmpdir / "first_run" + args = create_training_args(first_output_dir) + model = create_model() + + # Dataset preprocessing + raw_datasets = load_dataset("glue", "sst2") + sentence1_key = "sentence" + sentence2_key = None + label_to_id = None + max_seq_length = 32 + padding = "max_length" + + def preprocess_function(examples): + # Tokenize the texts + args = ( + (examples[sentence1_key],) + if sentence2_key is None + else (examples[sentence1_key], examples[sentence2_key]) + ) + result = tokenizer(*args, padding=padding, max_length=max_seq_length, truncation=True) + + # Map labels to IDs (not necessary for GLUE tasks) + if label_to_id is not None and "label" in examples: + result["label"] = [(label_to_id[l] if l != -1 else -1) for l in examples["label"]] + return result + + with args.main_process_first(desc="dataset map pre-processing"): + raw_datasets = raw_datasets.map(preprocess_function, batched=True) + train_dataset = raw_datasets["train"] + train_dataset = train_dataset.select(range(max_train_samples)) + eval_dataset = raw_datasets["validation"] + eval_dataset = eval_dataset.select(range(max_eval_samples)) + + trainer = NeuronTrainer( + model, args=args, train_dataset=train_dataset, eval_dataset=eval_dataset, tokenizer=tokenizer + ) + + train_result = trainer.train() + trainer.evaluate() + trainer.save_metrics("train", train_result.metrics) + + with open(first_output_dir / "train_results.json") as fp: + first_training_report = json.load(fp) + + # Case 1: Resuming from checkpoint by specifying a checkpoint directory. + second_output_dir = tmpdir / "second_run" + resume_from_checkpoint = first_output_dir / "checkpoint-5" + args = create_training_args(second_output_dir, resume_from_checkpoint=resume_from_checkpoint) + model = create_model() + trainer = NeuronTrainer( + model, args=args, train_dataset=train_dataset, eval_dataset=eval_dataset, tokenizer=tokenizer + ) + + train_result = trainer.train(resume_from_checkpoint=resume_from_checkpoint.as_posix()) + trainer.evaluate() + trainer.save_metrics("train", train_result.metrics) + + with open(first_output_dir / "train_results.json") as fp: + second_training_report = json.load(fp) + + assert first_training_report["train_loss"] == second_training_report["train_loss"] + + # Case 2: Resuming from checkpoint by specifying an output_dir with checkpoints. + # max_steps + 10 to do a some training steps than the previous run. + second_output_dir = first_output_dir + args = create_training_args(second_output_dir, max_steps=max_steps + 10) + model = create_model() + + trainer = NeuronTrainer( + model, args=args, train_dataset=train_dataset, eval_dataset=eval_dataset, tokenizer=tokenizer + ) + + trainer.train(resume_from_checkpoint=True) + trainer.evaluate()