From f5bf9e17ed2fc6d9513b3dc397c668c4ee7e29d3 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 12 Feb 2024 00:37:46 -0500 Subject: [PATCH 01/19] Bump ipykernel from 6.29.0 to 6.29.2 (#2994) Bumps [ipykernel](https://github.com/ipython/ipykernel) from 6.29.0 to 6.29.2. - [Release notes](https://github.com/ipython/ipykernel/releases) - [Changelog](https://github.com/ipython/ipykernel/blob/main/CHANGELOG.md) - [Commits](https://github.com/ipython/ipykernel/compare/v6.29.0...v6.29.2) --- updated-dependencies: - dependency-name: ipykernel dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index a027340b1e..2f649219c7 100644 --- a/setup.py +++ b/setup.py @@ -104,7 +104,7 @@ def package_files(prefix: str, directory: str, extension: str): 'fasteners==0.18', # object store tests require fasteners 'pytest==7.4.4', 'ipython==8.11.0', - 'ipykernel==6.29.0', + 'ipykernel==6.29.2', 'jupyter==1.0.0', 'yamllint==1.33.0', 'recommonmark==0.7.1', From ab97396bb646f1f1ca76e68b89e17f5d83d5f8a4 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 12 Feb 2024 00:37:54 -0500 Subject: [PATCH 02/19] Bump yamllint from 1.33.0 to 1.34.0 (#2995) Bumps [yamllint](https://github.com/adrienverge/yamllint) from 1.33.0 to 1.34.0. - [Changelog](https://github.com/adrienverge/yamllint/blob/master/CHANGELOG.rst) - [Commits](https://github.com/adrienverge/yamllint/compare/v1.33.0...v1.34.0) --- updated-dependencies: - dependency-name: yamllint dependency-type: direct:development update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 2f649219c7..6600f716a7 100644 --- a/setup.py +++ b/setup.py @@ -106,7 +106,7 @@ def package_files(prefix: str, directory: str, extension: str): 'ipython==8.11.0', 'ipykernel==6.29.2', 'jupyter==1.0.0', - 'yamllint==1.33.0', + 'yamllint==1.34.0', 'recommonmark==0.7.1', 'sphinx==4.4.0', 'pre-commit>=3.4.0,<4', From 6ff304142bf7289d911c7488378e5a78a8a236d0 Mon Sep 17 00:00:00 2001 From: Max Marion Date: Mon, 12 Feb 2024 08:29:26 -0800 Subject: [PATCH 03/19] Refactor `update_metric` (#2965) * commit one * rm unused imports * Update nlp.py * Update huggingface.py * Update nlp.py Make all args same name * Update huggingface.py * Update nlp.py * Update nlp.py * Update nlp.py * fix * wip * Update composer/metrics/nlp.py Co-authored-by: Daniel King <43149077+dakinggg@users.noreply.github.com> * finish * Update composer/metrics/nlp.py Co-authored-by: Daniel King <43149077+dakinggg@users.noreply.github.com> * add tests, dont return logits, fix linting * rm incorrect asserts * del incorrect asserts * rm pyright ignore * Update composer/metrics/nlp.py Co-authored-by: Daniel King <43149077+dakinggg@users.noreply.github.com> * rm comments --------- Co-authored-by: Jeremy D <115047575+bmosaicml@users.noreply.github.com> Co-authored-by: Jeremy Dohmann Co-authored-by: Daniel King <43149077+dakinggg@users.noreply.github.com> --- composer/metrics/nlp.py | 96 ++++++++++++++++++++++++++----- composer/models/huggingface.py | 11 +--- tests/metrics/test_nlp_metrics.py | 52 ++++++++++++++++- 3 files changed, 134 insertions(+), 25 deletions(-) diff --git a/composer/metrics/nlp.py b/composer/metrics/nlp.py index dd4d665678..b4815ea702 100644 --- a/composer/metrics/nlp.py +++ b/composer/metrics/nlp.py @@ -8,7 +8,7 @@ import re import string import warnings -from typing import Any, Dict, List, Mapping, Optional, Union +from typing import Any, Dict, List, Mapping, Optional, Tuple, Union import numpy as np import torch @@ -196,9 +196,20 @@ def compute(self) -> Tensor: class InContextLearningMetric(Metric): - def update(self, batch: dict, output_logits: torch.Tensor, labels: torch.Tensor): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.needs_batch = True + + def update(self, + batch: dict, + output_logits: Optional[torch.Tensor] = None, + labels: Optional[torch.Tensor] = None, + outputs: Optional[torch.Tensor] = None): """Abstract interface for computing an in-context learning metrics. + The `output_logits` argument is deprecated and will be removed in v0.21 while it's functionality will + be moved to `outputs`. + Args: batch (dict): Batch must consist minimally of `input_ids` as well as any other structure needed to compute the metric. @@ -210,6 +221,27 @@ def update(self, batch: dict, output_logits: torch.Tensor, labels: torch.Tensor) """ raise NotImplementedError + @staticmethod + def rename_args(batch: dict, + output_logits: Optional[torch.Tensor] = None, + labels: Optional[torch.Tensor] = None, + outputs: Optional[torch.Tensor] = None) -> Tuple[dict, torch.Tensor, torch.Tensor]: + if outputs is not None and output_logits is not None: + raise ValueError('Cannot use both `outputs` and `output_logits`') + if output_logits is not None: + warnings.warn( + ('`output_logits` has been renamed to `outputs` and will be removed in v0.21'), + DeprecationWarning, + ) + outputs = output_logits + + if labels is None: + raise ValueError('`labels` cannot be None') + if outputs is None: + raise ValueError('`outputs` cannot be None') + + return batch, outputs, labels + class InContextLearningQAAccuracy(InContextLearningMetric): r"""Computes accuracy for In-context learning (ICL) question answering (QA) tasks. @@ -266,9 +298,7 @@ def replace_underscore(text: str) -> str: return white_space_fix(remove_articles(handle_punc(lower(replace_underscore(answer))))).strip() - def update(self, outputs: List[str], labels: List[List[str]], batch: Optional[Dict[str, Any]] = None): - if batch is None: - batch = {} + def update(self, outputs: List[str], labels: List[List[str]], batch: Dict[str, Any]): cot_delimiter = batch.get('cot_delimiter', '') do_normalization = batch.get('do_normalization', True) stopping_criteria = batch.get('stopping_criteria', None) @@ -328,9 +358,18 @@ def __init__(self, dist_sync_on_step: bool = False): self.add_state('correct', default=torch.tensor(0.), dist_reduce_fx='sum') self.add_state('total', default=torch.tensor(0.), dist_reduce_fx='sum') - def update(self, batch: dict, output_logits: torch.Tensor, labels: torch.Tensor): + def update(self, + batch: dict, + output_logits: Optional[torch.Tensor] = None, + labels: Optional[torch.Tensor] = None, + outputs: Optional[torch.Tensor] = None): + batch, outputs, labels = InContextLearningMetric.rename_args(batch=batch, + output_logits=output_logits, + labels=labels, + outputs=outputs) + for batch_idx, cont_idx in enumerate(batch['continuation_indices']): - cont_tok_pred = output_logits[batch_idx].index_select(dim=0, index=cont_idx - 1).argmax(dim=-1) + cont_tok_pred = outputs[batch_idx].index_select(dim=0, index=cont_idx - 1).argmax(dim=-1) cont_tok_targ = labels[batch_idx].index_select(dim=0, index=cont_idx - 1) self.correct += (cont_tok_pred == cont_tok_targ).all().int() @@ -370,11 +409,20 @@ def __init__(self, dist_sync_on_step: bool = False): self.add_state('correct', default=torch.tensor(0.0), dist_reduce_fx='sum') self.add_state('total', default=torch.tensor(0.0), dist_reduce_fx='sum') - def update(self, batch: dict, output_logits: torch.Tensor, labels: torch.Tensor): + def update(self, + batch: dict, + output_logits: Optional[torch.Tensor] = None, + labels: Optional[torch.Tensor] = None, + outputs: Optional[torch.Tensor] = None): + batch, outputs, labels = InContextLearningMetric.rename_args(batch=batch, + output_logits=output_logits, + labels=labels, + outputs=outputs) + perplexities = [] for batch_idx, cont_idx in enumerate(batch['continuation_indices']): # continuation indices refer to indices in the original input's token space - cont_tok_logits = output_logits[batch_idx].index_select(dim=0, index=cont_idx - 1) + cont_tok_logits = outputs[batch_idx].index_select(dim=0, index=cont_idx - 1) # labels have been shifted left by one index, so the cont_idx needs to be shifted as well. cont_tok_targ = labels[batch_idx].index_select(dim=0, index=cont_idx - 1) cross_entropy = F.cross_entropy(cont_tok_logits, cont_tok_targ) @@ -455,11 +503,20 @@ class InContextLearningMCExpectedCalibrationError(InContextLearningExpectedCalib # Make torchmetrics call update only once full_state_update = False - def update(self, batch: Dict[str, Any], output_logits: torch.Tensor, labels: torch.Tensor): - output_logits = torch.softmax(output_logits, dim=2) + def update(self, + batch: dict, + output_logits: Optional[torch.Tensor] = None, + labels: Optional[torch.Tensor] = None, + outputs: Optional[torch.Tensor] = None): + batch, outputs, labels = InContextLearningMetric.rename_args(batch=batch, + output_logits=output_logits, + labels=labels, + outputs=outputs) + + outputs = torch.softmax(outputs, dim=2) probabilites = [] for batch_idx, cont_idx in enumerate(batch['continuation_indices']): - cont_tok_logits = output_logits[batch_idx].index_select(dim=0, index=cont_idx - 1) + cont_tok_logits = outputs[batch_idx].index_select(dim=0, index=cont_idx - 1) cont_tok_targ = labels[batch_idx].index_select(dim=0, index=cont_idx - 1) probability = cont_tok_logits.index_select(dim=1, index=cont_tok_targ).diagonal().mean() probabilites.append(probability) @@ -491,10 +548,19 @@ class InContextLearningLMExpectedCalibrationError(InContextLearningExpectedCalib # Make torchmetrics call update only once full_state_update = False - def update(self, batch: Dict[str, Any], output_logits: torch.Tensor, labels: torch.Tensor): - output_logits = torch.softmax(output_logits, dim=2) + def update(self, + batch: dict, + output_logits: Optional[torch.Tensor] = None, + labels: Optional[torch.Tensor] = None, + outputs: Optional[torch.Tensor] = None): + batch, outputs, labels = InContextLearningMetric.rename_args(batch=batch, + output_logits=output_logits, + labels=labels, + outputs=outputs) + + outputs = torch.softmax(outputs, dim=2) for batch_idx, cont_idx in enumerate(batch['continuation_indices']): - cont_tok_logits = output_logits[batch_idx].index_select(dim=0, index=cont_idx - 1) + cont_tok_logits = outputs[batch_idx].index_select(dim=0, index=cont_idx - 1) cont_tok_pred = cont_tok_logits.argmax(dim=-1) confidence = cont_tok_logits.max(dim=-1).values.min() cont_tok_targ = labels[batch_idx].index_select(dim=0, index=cont_idx - 1) diff --git a/composer/models/huggingface.py b/composer/models/huggingface.py index e84f840fe9..439f8b50fe 100644 --- a/composer/models/huggingface.py +++ b/composer/models/huggingface.py @@ -21,7 +21,6 @@ import torch from torchmetrics import Metric -from composer.metrics import InContextLearningMetric, InContextLearningQAAccuracy from composer.models.base import ComposerModel from composer.utils import MissingConditionalImportError, dist, get_file, import_object, is_model_fsdp, safe_torch_load @@ -532,14 +531,10 @@ def get_metrics(self, is_train: bool = False) -> Dict[str, Metric]: return metrics if metrics else {} def update_metric(self, batch: Any, outputs: Any, metric: Metric) -> None: - if isinstance(metric, InContextLearningQAAccuracy): - assert self.labels is not None - metric.update(batch=batch, outputs=outputs, labels=self.labels) # pyright: ignore [reportGeneralTypeIssues] - elif isinstance(metric, InContextLearningMetric): - assert self.labels is not None - metric.update(batch, outputs, self.labels) # pyright: ignore [reportGeneralTypeIssues] + if getattr(metric, 'needs_batch', False): + metric.update(batch=batch, outputs=outputs, labels=self.labels) else: - metric.update(outputs, self.labels) # pyright: ignore [reportGeneralTypeIssues] + metric.update(outputs, self.labels) def get_metadata(self): model_output = {} diff --git a/tests/metrics/test_nlp_metrics.py b/tests/metrics/test_nlp_metrics.py index 9a3fa6760d..e31cd4d410 100644 --- a/tests/metrics/test_nlp_metrics.py +++ b/tests/metrics/test_nlp_metrics.py @@ -11,8 +11,9 @@ from composer.metrics.nlp import (BinaryF1Score, InContextLearningCodeEvalAccuracy, InContextLearningExpectedCalibrationError, InContextLearningLMAccuracy, InContextLearningLMExpectedCalibrationError, - InContextLearningMCExpectedCalibrationError, InContextLearningMultipleChoiceAccuracy, - InContextLearningQAAccuracy, LanguageCrossEntropy, LanguagePerplexity, MaskedAccuracy) + InContextLearningMCExpectedCalibrationError, InContextLearningMetric, + InContextLearningMultipleChoiceAccuracy, InContextLearningQAAccuracy, + LanguageCrossEntropy, LanguagePerplexity, MaskedAccuracy) @pytest.mark.parametrize('ignore_index', [-100]) @@ -172,6 +173,53 @@ def test_language_perplexity(): assert torch.equal(torch.exp(ce), perplexity) +def test_in_context_learning_rename_args_no_op(): + batch = {'input': [1, 2, 3]} + outputs = torch.Tensor([12, 13, 14]) + labels = torch.Tensor([0, 1, 0]) + batch, outputs, labels = InContextLearningMetric.rename_args(batch=batch, outputs=outputs, labels=labels) + assert batch == {'input': [1, 2, 3]} + assert torch.all(torch.eq(outputs, torch.tensor([12, 13, 14]))) + assert torch.all(torch.eq(labels, torch.tensor([0, 1, 0]))) + + +def test_in_context_learning_rename_args_output_and_output_logits(): + batch = {'input': [1, 2, 3]} + outputs = torch.Tensor([12, 13, 14]) + output_logits = torch.Tensor([.1, .2, .3]) + labels = torch.Tensor([0, 1, 0]) + with pytest.raises(ValueError): + _, _, _ = InContextLearningMetric.rename_args(batch=batch, + outputs=outputs, + labels=labels, + output_logits=output_logits) + + +def test_in_context_learning_rename_args_rename_output_logits(): + batch = {'input': [1, 2, 3]} + output_logits = torch.Tensor([.1, .2, .3]) + labels = torch.Tensor([0, 1, 0]) + batch, outputs, labels = InContextLearningMetric.rename_args(batch=batch, + labels=labels, + output_logits=output_logits) + assert batch == {'input': [1, 2, 3]} + assert torch.all(torch.eq(outputs, torch.Tensor([.1, .2, .3]))) # pyright: ignore [reportGeneralTypeIssues] + assert torch.all(torch.eq(labels, torch.tensor([0, 1, 0]))) + + +def test_in_context_learning_rename_args_fail_on_no_label(): + batch = {'input': [1, 2, 3]} + output_logits = torch.Tensor([.1, .2, .3]) + with pytest.raises(ValueError): + _, _, _ = InContextLearningMetric.rename_args(batch=batch, output_logits=output_logits) + + +def test_in_context_learning_rename_args_fail_on_no_output(): + batch = {'input': [1, 2, 3]} + with pytest.raises(ValueError): + _, _, _ = InContextLearningMetric.rename_args(batch=batch) + + def test_in_context_learning_lm_accuracy(tiny_gpt2_tokenizer): contexts = ['The dog is', 'I love to eat', 'I hate', 'The weather is'] continuations = [' furry', ' pie', ' long lines', ' snowy'] From 6d4575d6bbe83b339bc3a320f843ae7fe20715a5 Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Mon, 12 Feb 2024 15:12:19 -0500 Subject: [PATCH 04/19] Add azure integration test (#2996) * add cpu test * new test file * fix --- .github/workflows/daily.yaml | 2 + .github/workflows/pytest-cpu.yaml | 6 + .../object_store/test_azure_object_store.py | 33 ++++++ .../test_integration_gs_object_store.py | 107 ------------------ 4 files changed, 41 insertions(+), 107 deletions(-) create mode 100644 tests/utils/object_store/test_azure_object_store.py delete mode 100644 tests/utils/object_store/test_integration_gs_object_store.py diff --git a/.github/workflows/daily.yaml b/.github/workflows/daily.yaml index 0b428a39b2..3c65b0f4fa 100644 --- a/.github/workflows/daily.yaml +++ b/.github/workflows/daily.yaml @@ -88,6 +88,8 @@ jobs: code-eval-apikey: ${{ secrets.CODE_EVAL_APIKEY }} gcs-key: ${{ secrets.GCS_KEY }} gcs-secret: ${{ secrets.GCS_SECRET }} + azure-account-name: ${{ secrets.AZURE_ACCOUNT_NAME }} + azure-account-access-key: ${{ secrets.AZURE_ACCOUNT_ACCESS_KEY }} coverage: uses: ./.github/workflows/coverage.yaml name: Coverage Results diff --git a/.github/workflows/pytest-cpu.yaml b/.github/workflows/pytest-cpu.yaml index 3f237424ba..af95f8918f 100644 --- a/.github/workflows/pytest-cpu.yaml +++ b/.github/workflows/pytest-cpu.yaml @@ -45,6 +45,10 @@ on: required: false gcs-secret: required: false + azure-account-name: + required: false + azure-account-access-key: + required: false jobs: pytest-cpu: timeout-minutes: 30 @@ -75,6 +79,8 @@ jobs: export CODE_EVAL_APIKEY='${{ secrets.code-eval-apikey }}' export GCS_KEY='${{ secrets.gcs-key }}' export GCS_SECRET='${{ secrets.gcs-secret }}' + export AZURE_ACCOUNT_NAME='${{ secrets.azure-account-name }}' + export AZURE_ACCOUNT_ACCESS_KEY='${{ secrets.azure-account-access-key }}' export S3_BUCKET='${{ inputs.pytest-s3-bucket }}' export COMMON_ARGS="-v --durations=20 -m '${{ inputs.pytest-markers }}' --s3_bucket '$S3_BUCKET' \ -o tmp_path_retention_policy=none" diff --git a/tests/utils/object_store/test_azure_object_store.py b/tests/utils/object_store/test_azure_object_store.py new file mode 100644 index 0000000000..949e2149ff --- /dev/null +++ b/tests/utils/object_store/test_azure_object_store.py @@ -0,0 +1,33 @@ +# Copyright 2022 MosaicML Composer authors +# SPDX-License-Identifier: Apache-2.0 + +import pytest +from torch.utils.data import DataLoader + +from composer.trainer import Trainer +from tests.common import RandomClassificationDataset, SimpleModel + + +@pytest.mark.remote +def test_azure_object_store_integration(): + model = SimpleModel() + train_dataloader = DataLoader(dataset=RandomClassificationDataset()) + trainer_save = Trainer( + model=model, + train_dataloader=train_dataloader, + save_folder='azure://mosaicml-composer-tests/checkpoints/{run_name}', + save_filename='test-model.pt', + max_duration='1ba', + ) + run_name = trainer_save.state.run_name + trainer_save.fit() + trainer_save.close() + + trainer_load = Trainer( + model=model, + train_dataloader=train_dataloader, + load_path=f'azure://mosaicml-composer-tests/checkpoints/{run_name}/test-model.pt', + max_duration='2ba', + ) + trainer_load.fit() + trainer_load.close() diff --git a/tests/utils/object_store/test_integration_gs_object_store.py b/tests/utils/object_store/test_integration_gs_object_store.py deleted file mode 100644 index 1a08bb73ce..0000000000 --- a/tests/utils/object_store/test_integration_gs_object_store.py +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright 2022 MosaicML Composer authors -# SPDX-License-Identifier: Apache-2.0 - -import time -from pathlib import Path - -import pytest - -from composer.utils import GCSObjectStore - -__DUMMY_OBJ__ = '/tmp/dummy.ckpt' -__NUM_BYTES__ = 1000 -bucket_name = 'mosaicml-composer-tests' - - -@pytest.mark.remote -@pytest.fixture -def gs_object_store(): - pytest.skip('Run this test suite only after GCS service account is configured on CI node.') - remote_dir = 'gs://mosaicml-composer-tests/streaming/' - yield GCSObjectStore(remote_dir) - - -@pytest.mark.remote -def test_bucket_not_found(): - pytest.skip('Run this test suite only after GCS service account is configured on CI node.') - with pytest.raises(FileNotFoundError): - _ = GCSObjectStore('gs://not_a_bucket/streaming') - - -@pytest.mark.remote -def test_get_uri(gs_object_store): - pytest.skip('Run this test suite only after GCS service account is configured on CI node.') - object_name = 'test-object' - expected_uri = 'gs://mosaicml-composer-tests/streaming/test-object' - assert (gs_object_store.get_uri(object_name) == expected_uri) - - -@pytest.mark.remote -def test_get_key(gs_object_store): - pytest.skip('Run this test suite only after GCS service account is configured on CI node.') - object_name = 'test-object' - expected_key = 'streaming/test-object' - assert (gs_object_store.get_key(object_name) == expected_key) - - -@pytest.mark.remote -@pytest.mark.parametrize('result', ['success', 'not found']) -def test_get_object_size(gs_object_store, result: str): - pytest.skip('Run this test suite only after GCS service account is configured on CI node.') - fn = Path(__DUMMY_OBJ__) - with open(fn, 'wb') as fp: - fp.write(bytes('0' * __NUM_BYTES__, 'utf-8')) - gs_object_store.upload_object(fn) - - if result == 'success': - assert (gs_object_store.get_object_size(__DUMMY_OBJ__) == __NUM_BYTES__) - else: # not found - with pytest.raises(FileNotFoundError): - gs_object_store.get_object_size(__DUMMY_OBJ__ + f'time.ctime()') - - -@pytest.mark.remote -def test_upload_object(gs_object_store): - pytest.skip('Run this test suite only after GCS service account is configured on CI node.') - from google.cloud.storage import Blob - destination_blob_name = '/tmp/dummy.ckpt2' - key = gs_object_store.get_key(destination_blob_name) - stats = Blob(bucket=gs_object_store.bucket, name=key).exists(gs_object_store.client) - if not stats: - gs_object_store.upload_object(__DUMMY_OBJ__, destination_blob_name) - - -@pytest.mark.remote -def test_list_objects(gs_object_store): - pytest.skip('Run this test suite only after GCS service account is configured on CI node.') - from google.cloud.storage import Blob - destination_blob_name = '/tmp/dummy.ckpt2' - key = gs_object_store.get_key(destination_blob_name) - stats = Blob(bucket=gs_object_store.bucket, name=key).exists(gs_object_store.client) - if not stats: - gs_object_store.upload_object(__DUMMY_OBJ__, destination_blob_name) - objects = gs_object_store.list_objects() - assert (key in objects) - - -@pytest.mark.remote -@pytest.mark.parametrize('result', ['success', 'file_exists', 'obj_not_found']) -def test_download_object(gs_object_store, tmp_path, result: str): - pytest.skip('Run this test suite only after GCS service account is configured on CI node.') - fn = Path(__DUMMY_OBJ__) - with open(fn, 'wb') as fp: - fp.write(bytes('0' * __NUM_BYTES__, 'utf-8')) - gs_object_store.upload_object(fn) - - object_name = __DUMMY_OBJ__ - filename = './dummy.ckpt.download' - - if result == 'success': - gs_object_store.download_object(object_name, filename, overwrite=True) - - elif result == 'file_exists': - with pytest.raises(FileExistsError): - gs_object_store.download_object(object_name, __DUMMY_OBJ__) - else: # obj_not_found - with pytest.raises(FileNotFoundError): - gs_object_store.download_object(object_name + f'{time.ctime()}', filename, overwrite=True) From 375ea0cc3f8e0f5fadf77825f06f2909f87e9fe3 Mon Sep 17 00:00:00 2001 From: bigning Date: Mon, 12 Feb 2024 12:45:37 -0800 Subject: [PATCH 05/19] Fix Profiler schedule skip_first (#2992) * fix skip_first for resumption * update doc * v2 * move after_load callback to profiler * fix unit tests --------- Co-authored-by: Mihir Patel --- composer/profiler/profiler.py | 13 +++++++-- composer/profiler/profiler_schedule.py | 19 +++++++----- tests/callbacks/callback_settings.py | 5 ++++ tests/profiler/test_profiler.py | 40 +++++++++++++++++++++++++- 4 files changed, 67 insertions(+), 10 deletions(-) diff --git a/composer/profiler/profiler.py b/composer/profiler/profiler.py index c88c1f0912..a3a7127e58 100644 --- a/composer/profiler/profiler.py +++ b/composer/profiler/profiler.py @@ -9,6 +9,8 @@ import pathlib from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Sequence, Tuple, Union +from composer.core import Callback +from composer.loggers import Logger from composer.profiler.json_trace_handler import JSONTraceHandler from composer.profiler.marker import Marker from composer.profiler.profiler_action import ProfilerAction @@ -18,14 +20,14 @@ from composer.utils import ensure_tuple, parse_uri if TYPE_CHECKING: - from composer.core import Callback, State + from composer.core import State __all__ = ['Profiler'] log = logging.getLogger(__name__) -class Profiler: +class Profiler(Callback): """Composer Profiler. See the :doc:`Profiling Guide ` for additional information. @@ -118,6 +120,8 @@ def __init__( self.schedule = schedule self.state = None self._callbacks: List[Callback] = [] + # Used to count skip_first starting from resumption timestamp + self.resumption_batch_idx: int = 0 self.remote_filenames: List[str] = [] # First, add each remote file name to self.remote_filenames to create RemoteUploaderDownloader logger in trainer. [s3://bucket/path/to/file] # Then modify remote file name to be a local path to pass into torch_profiler and system_profiler. e.g: path/to/file @@ -185,6 +189,7 @@ def bind_to_state( state (State): The training state. """ self.state = state + self.state.callbacks.append(self) self.state.callbacks.extend(self._callbacks) self.state.callbacks.extend(self._trace_handlers) @@ -289,3 +294,7 @@ def should_record(state: State) -> bool: ) self._names_to_markers[name].categories = categories return self._names_to_markers[name] + + def after_load(self, state: State, logger: Logger) -> None: + del logger + self.resumption_batch_idx = int(state.timestamp.batch_in_epoch) diff --git a/composer/profiler/profiler_schedule.py b/composer/profiler/profiler_schedule.py index 02b72b8a50..08d2549c2b 100644 --- a/composer/profiler/profiler_schedule.py +++ b/composer/profiler/profiler_schedule.py @@ -23,10 +23,11 @@ def cyclic_schedule( This function returns a schedule function that uses a cyclic profiling window. The resulting function can be passed as the ``prof_schedule`` argument to the :class:`.Trainer`. - The cyclic window skips the first ``skip_first`` batches in every epoch. Then, it performs a cycle of - skipping ``wait`` batches, warming up for ``warmup`` batches, and recording ``active`` batches. - It repeats this cycle up to ``repeat`` times per epoch (or for the entire epoch, if ``repeat`` is 0). - This logic repeats every epoch. + The cyclic window skips the first ``skip_first`` + ``resumption_batch_idx`` batches in every epoch. + ``resumption_batch_idx`` is accessed from state.profiler. It is the ``state.timestamp.batch_in_epoch`` + when resuming training. Then, it performs a cycle of skipping ``wait`` batches, warming up for ``warmup`` + batches, and recording ``active`` batches. It repeats this cycle up to ``repeat`` times per epoch (or + for the entire epoch, if ``repeat`` is 0). This logic repeats every epoch. Args: skip_first (int, optional): Number of batches to skip profiling at epoch start. Defaults to ``0``. @@ -46,12 +47,16 @@ def schedule(state: State): # do wait, then warump, then active, up to repeat times per cycle cycle_len = wait + warmup + active batch_idx = int(state.timestamp.batch_in_epoch) - if batch_idx < skip_first: + if state.profiler is not None: + skip_first_after_resumption = skip_first + state.profiler.resumption_batch_idx + else: + skip_first_after_resumption = skip_first + if batch_idx < skip_first_after_resumption: return ProfilerAction.SKIP - if repeat != 0 and batch_idx >= cycle_len * repeat + skip_first: + if repeat != 0 and batch_idx >= cycle_len * repeat + skip_first_after_resumption: # exhausted the repeat return ProfilerAction.SKIP - position_in_cycle = (batch_idx - skip_first) % cycle_len + position_in_cycle = (batch_idx - skip_first_after_resumption) % cycle_len if position_in_cycle < wait: return ProfilerAction.SKIP if position_in_cycle < wait + warmup: diff --git a/tests/callbacks/callback_settings.py b/tests/callbacks/callback_settings.py index 22a4a09b58..f6065c1863 100644 --- a/tests/callbacks/callback_settings.py +++ b/tests/callbacks/callback_settings.py @@ -3,6 +3,7 @@ import os from typing import Any, Dict, List, Tuple, Type +from unittest.mock import MagicMock import pytest from torch.utils.data import DataLoader @@ -125,6 +126,10 @@ NeptuneLogger: { 'mode': 'debug', }, + composer.profiler.Profiler: { + 'trace_handlers': [MagicMock()], + 'schedule': composer.profiler.cyclic_schedule(), + } } _callback_marks: Dict[Type[Callback], List[pytest.MarkDecorator],] = { diff --git a/tests/profiler/test_profiler.py b/tests/profiler/test_profiler.py index 2ae9383d79..f13be17486 100644 --- a/tests/profiler/test_profiler.py +++ b/tests/profiler/test_profiler.py @@ -9,8 +9,10 @@ import pytest import torch from packaging import version +from torch.profiler.profiler import ProfilerAction as TorchProfilerAction -from composer.core import State +from composer.core import Engine, Event, State, Timestamp +from composer.loggers import Logger from composer.profiler import Profiler, ProfilerAction, SystemProfiler, TorchProfiler, cyclic_schedule from composer.profiler.utils import export_memory_timeline_html @@ -170,3 +172,39 @@ def test_memory_timeline(tmp_path: pathlib.Path) -> None: assert fig is not None, 'export_memory_timeline_html should return a figure when return_fig=True' _, end = fig.gca().get_ylim() assert round(end, 2) == 0.06 + + +def test_skip_first_after_resumption(minimal_state: State) -> None: + skip_first = 1 + wait = 2 + warmup = 3 + active = 4 + repeat = 1 + schedule = cyclic_schedule(skip_first=skip_first, wait=wait, warmup=warmup, active=active, repeat=repeat) + mock_trace_handler = MagicMock() + profiler = Profiler( + trace_handlers=[mock_trace_handler], + schedule=schedule, + ) + profiler.bind_to_state(minimal_state) + minimal_state.profiler = profiler + + assert len(profiler._callbacks) >= 1 + assert isinstance(profiler._callbacks[-1], TorchProfiler) + torch_profiler = profiler._callbacks[-1] + + # Create torch.profiler.profile + logger = Logger(minimal_state) + engine = Engine(state=minimal_state, logger=logger) + engine.run_event(Event.INIT) + assert torch_profiler.profiler is not None + + minimal_state.timestamp = Timestamp(batch_in_epoch=7) + assert torch_profiler.profiler.schedule(0) == TorchProfilerAction.RECORD + + # Load checkpoint at batch 4 + minimal_state.timestamp = Timestamp(batch_in_epoch=4) + engine.run_event(Event.BEFORE_LOAD) + engine.run_event(Event.AFTER_LOAD) + minimal_state.timestamp = Timestamp(batch_in_epoch=7) + assert torch_profiler.profiler.schedule(0) == TorchProfilerAction.WARMUP From a88c7fedaa1f89aea01343dc85eafea8e66f0e4f Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Mon, 12 Feb 2024 20:12:23 -0500 Subject: [PATCH 06/19] Remove planner validation (#2985) * remove * sync * lint --- composer/utils/checkpoint.py | 50 ++----------------- .../eval_client/test_local_eval_client.py | 9 ++-- 2 files changed, 10 insertions(+), 49 deletions(-) diff --git a/composer/utils/checkpoint.py b/composer/utils/checkpoint.py index 7c03aaa50c..4f0b0937fe 100644 --- a/composer/utils/checkpoint.py +++ b/composer/utils/checkpoint.py @@ -545,8 +545,6 @@ def load_sharded_checkpoint( if state.fsdp_config is None: raise ValueError('Loading a sharded checkpoint requires passing an FSDP config to Trainer.') - load_planner = state.fsdp_config['load_planner'] - _validate_load_planner(load_planner) # Check to make sure source_path is a directory. if object_store is None: @@ -603,14 +601,14 @@ def load_sharded_checkpoint( dist_cp.load( # type: ignore state_dict=state_dict, storage_reader=storage_reader, - planner=load_planner, + planner=state.fsdp_config['load_planner'], no_dist=(not dist.is_initialized()), ) else: dist_cp.load_state_dict( state_dict=state_dict, storage_reader=storage_reader, - planner=load_planner, + planner=state.fsdp_config['load_planner'], no_dist=(not dist.is_initialized()), ) @@ -822,40 +820,6 @@ def filter_func(state_dict: dict) -> None: return filter_func -def _validate_save_planner(save_planner: Optional[Any]) -> None: - """Checks that ``save_planner`` is an instance of a :class:`~torch.distributed.checkpoint.planner.SavePlanner`. - - TODO(GRT-2456): Remove validation once we deprecate torch 1.13 and can use - type hints. - - Raises: - ValueError: If ``save_planner`` is not a - :class:`~torch.distributed.checkpoint.planner.SavePlanner`. - """ - from torch.distributed.checkpoint.planner import SavePlanner - - if save_planner is not None and not isinstance(save_planner, SavePlanner): - raise ValueError((f'save_planner {type(save_planner)} is not a ' - 'torch.distributed.checkpoint.planner.SavePlanner')) - - -def _validate_load_planner(load_planner: Optional[Any]) -> None: - """Checks that ``load_planner`` is an instance of a :class:`~torch.distributed.checkpoint.planner.LoadPlanner`. - - TODO(GRT-2456): Remove validation once we deprecate torch 1.13 and can use - type hints. - - Raises: - ValueError: If ``load_planner`` is not a - :class:`~torch.distributed.checkpoint.planner.LoadPlanner`. - """ - from torch.distributed.checkpoint.planner import LoadPlanner - - if load_planner is not None and not isinstance(load_planner, LoadPlanner): - raise ValueError((f'load_planner {type(load_planner)} is not a ' - 'torch.distributed.checkpoint.planner.LoadPlanner')) - - def safe_torch_load( composer_states_filepath: Union[Path, str], map_location: str = 'cpu', @@ -1060,14 +1024,10 @@ def _save_checkpoint( _save_deepspeed_model(state.deepspeed_model, save_filename) - # Sharded checkpointing for torch >=2.0 uses the torch.distributed.checkpoint module. + # Sharded checkpointing elif state.fsdp_elastic_sharded_enabled: if state.fsdp_config is None: raise ValueError('Saving a sharded checkpoint requires passing an FSDP config to Trainer.') - save_planner = state.fsdp_config['save_planner'] - _validate_save_planner(save_planner) - - import torch.distributed.checkpoint as dist_cp log.debug(f'Saving sharded checkpoints to {save_filename}...') process_group = None @@ -1086,14 +1046,14 @@ def _save_checkpoint( dist_cp.save( # type: ignore state_dict=state_dict, storage_writer=dist_cp.FileSystemWriter(dirname), - planner=save_planner, + planner=state.fsdp_config['save_planner'], process_group=process_group, ) else: dist_cp.save_state_dict( state_dict=state_dict, storage_writer=dist_cp.FileSystemWriter(dirname), - planner=save_planner, + planner=state.fsdp_config['save_planner'], process_group=process_group, ) log.debug('Finished pytorch save state dict') diff --git a/tests/utils/eval_client/test_local_eval_client.py b/tests/utils/eval_client/test_local_eval_client.py index 8a598608d0..b114096ad3 100644 --- a/tests/utils/eval_client/test_local_eval_client.py +++ b/tests/utils/eval_client/test_local_eval_client.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 import pytest -from composer.utils import LocalEvalClient +from composer.utils import LocalEvalClient, dist from tests.common.markers import world_size @@ -29,10 +29,11 @@ ) @world_size(1, 2) def test_local_invoke(code: str, result: str, language: str, world_size: int, tmp_path: str): - """Test invocation function for LocalEvalClient with code that succeeds, fails compilation, times out, and is incorrect in C, C++, Python, JS. + """Test invocation function for LocalEvalClient. + + Code can succeed, fail compilation, time out, or be incorrect in C, C++, Python, JS. """ - import os - os.makedirs(os.path.dirname(tmp_path), exist_ok=True) + dist.barrier() # Ensure all processes are ready to run the test as invoke doesn't use dist eval_client = LocalEvalClient() input = '(1,)' if language == 'python' else '1' assert eval_client.invoke([[[{ From d2b9cd3e3b5d1b0a69ed093e26592f02e8f9d0ec Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Mon, 12 Feb 2024 20:38:30 -0500 Subject: [PATCH 07/19] Fix load for non-HSDP device mesh (#2997) * fix * lint * lint * fix --- composer/utils/checkpoint.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/composer/utils/checkpoint.py b/composer/utils/checkpoint.py index 4f0b0937fe..ad814f84f4 100644 --- a/composer/utils/checkpoint.py +++ b/composer/utils/checkpoint.py @@ -213,7 +213,9 @@ def __init__(self, source_path: str, destination_path: str, object_store: Union[ super().__init__(destination_path) def read_data(self, plan: LoadPlan, planner: LoadPlanner): - first_replica = self.device_mesh is None or self.device_mesh.get_local_rank(mesh_dim=0) == 0 + # Download files if not using HSDP or if on first replica with HSDP enabled + first_replica = self.device_mesh is None or self.device_mesh.ndim == 1 or ( + self.device_mesh.ndim >= 2 and self.device_mesh.get_local_rank(mesh_dim=0) == 0) # 1. Download to the destination all files this rank needs if on first replica if first_replica: From b1e7db4f5655a338df508da4a02a67d46991214a Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Mon, 12 Feb 2024 21:42:39 -0500 Subject: [PATCH 08/19] fix (#3000) --- composer/cli/launcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer/cli/launcher.py b/composer/cli/launcher.py index c72e1b01db..f26030c745 100755 --- a/composer/cli/launcher.py +++ b/composer/cli/launcher.py @@ -291,7 +291,7 @@ def _launch_processes( MASTER_ADDR=master_addr, MASTER_PORT=str(master_port), PYTHONUNBUFFERED='1', - NCCL_ASYNC_ERROR_HANDLING='1', + TORCH_NCCL_ASYNC_ERROR_HANDLING='1', ): # Populate the distributed variables in all launcher args for arg in training_script_args: From 94b63d74b8a54ccaae97c26b43b5da4dec50f1ce Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Mon, 12 Feb 2024 21:51:23 -0500 Subject: [PATCH 09/19] Add bias argument to LPLN (#2999) * add bias * lint * daniel is so picky but also usually right --- .../low_precision_layernorm.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py b/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py index 64ffaebb11..6baffd43e3 100644 --- a/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py +++ b/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py @@ -75,11 +75,21 @@ def apply(self, event: Event, state: State, logger: Logger) -> Optional[int]: class LPLayerNorm(torch.nn.LayerNorm): - def __init__(self, normalized_shape, eps=1e-05, elementwise_affine=True, device=None, dtype=None): + def __init__( + self, + normalized_shape, + eps: float = 1e-05, + elementwise_affine: bool = True, + *, + bias: bool = True, + device=None, + dtype=None, + ): super().__init__( normalized_shape=normalized_shape, eps=eps, elementwise_affine=elementwise_affine, + bias=bias, device=device, dtype=dtype, ) @@ -111,7 +121,7 @@ def _to_LPLayerNorm(layer: torch.nn.Module, module_index: int) -> LPLayerNorm: """Defines a replacement policy from a `torch.nn.LayerNorm` to a `LPLayerNorm`""" if not isinstance(layer, torch.nn.LayerNorm): raise TypeError(f'Expected torch.nn.LayerNorm, got {type(layer)}') - lp_layernorm = LPLayerNorm(layer.normalized_shape, layer.eps, layer.elementwise_affine) + lp_layernorm = LPLayerNorm(layer.normalized_shape, layer.eps, layer.elementwise_affine) # type: ignore with torch.no_grad(): if layer.weight is None: # pyright: ignore[reportUnnecessaryComparison] From 17bfa7e4051b6d5e4e0b211340335fc9857f8ffd Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Tue, 13 Feb 2024 12:01:33 -0500 Subject: [PATCH 10/19] Revert "Add bias argument to LPLN (#2999)" (#3003) This reverts commit 94b63d74b8a54ccaae97c26b43b5da4dec50f1ce. --- .../low_precision_layernorm.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py b/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py index 6baffd43e3..64ffaebb11 100644 --- a/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py +++ b/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py @@ -75,21 +75,11 @@ def apply(self, event: Event, state: State, logger: Logger) -> Optional[int]: class LPLayerNorm(torch.nn.LayerNorm): - def __init__( - self, - normalized_shape, - eps: float = 1e-05, - elementwise_affine: bool = True, - *, - bias: bool = True, - device=None, - dtype=None, - ): + def __init__(self, normalized_shape, eps=1e-05, elementwise_affine=True, device=None, dtype=None): super().__init__( normalized_shape=normalized_shape, eps=eps, elementwise_affine=elementwise_affine, - bias=bias, device=device, dtype=dtype, ) @@ -121,7 +111,7 @@ def _to_LPLayerNorm(layer: torch.nn.Module, module_index: int) -> LPLayerNorm: """Defines a replacement policy from a `torch.nn.LayerNorm` to a `LPLayerNorm`""" if not isinstance(layer, torch.nn.LayerNorm): raise TypeError(f'Expected torch.nn.LayerNorm, got {type(layer)}') - lp_layernorm = LPLayerNorm(layer.normalized_shape, layer.eps, layer.elementwise_affine) # type: ignore + lp_layernorm = LPLayerNorm(layer.normalized_shape, layer.eps, layer.elementwise_affine) with torch.no_grad(): if layer.weight is None: # pyright: ignore[reportUnnecessaryComparison] From 0d041acb370f46395cac721b9e92fac277ac527b Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Tue, 13 Feb 2024 12:01:41 -0500 Subject: [PATCH 11/19] Revert "fix (#3000)" (#3004) This reverts commit b1e7db4f5655a338df508da4a02a67d46991214a. --- composer/cli/launcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer/cli/launcher.py b/composer/cli/launcher.py index f26030c745..c72e1b01db 100755 --- a/composer/cli/launcher.py +++ b/composer/cli/launcher.py @@ -291,7 +291,7 @@ def _launch_processes( MASTER_ADDR=master_addr, MASTER_PORT=str(master_port), PYTHONUNBUFFERED='1', - TORCH_NCCL_ASYNC_ERROR_HANDLING='1', + NCCL_ASYNC_ERROR_HANDLING='1', ): # Populate the distributed variables in all launcher args for arg in training_script_args: From 7f9c42510abab1a6ba3571121cd6f98ebc020e3d Mon Sep 17 00:00:00 2001 From: Charles Tang Date: Tue, 13 Feb 2024 10:27:59 -0800 Subject: [PATCH 12/19] Add torch 2.3 image for aws cluster (#3002) --- docker/README.md | 1 + docker/build_matrix.yaml | 27 +++++++++++++++++++++++++++ docker/generate_build_matrix.py | 17 +++++++++++++++++ 3 files changed, 45 insertions(+) diff --git a/docker/README.md b/docker/README.md index 62a265adf8..8ea2217bc9 100644 --- a/docker/README.md +++ b/docker/README.md @@ -32,6 +32,7 @@ To install composer, once inside the image, run `pip install mosaicml`. |----------------|----------|-------------------|---------------------|------------------|------------------------------------------------------------------------------------------| | Ubuntu 20.04 | Base | 2.3.0 | 12.1.0 (Infiniband) | 3.11 | `mosaicml/pytorch:2.3.0_cu121-nightly20240110-python3.11-ubuntu20.04` | | Ubuntu 20.04 | Base | 2.3.0 | 12.1.0 (Infiniband) | 3.10 | `mosaicml/pytorch:2.3.0_cu121-nightly20240110-python3.10-ubuntu20.04` | +| Ubuntu 20.04 | Base | 2.3.0 | 12.1.0 (EFA) | 3.10 | `mosaicml/pytorch:2.3.0_cu121-nightly20240110-python3.10-ubuntu20.04` | | Ubuntu 20.04 | Base | 2.2.0 | 12.1.0 (Infiniband) | 3.11 | `mosaicml/pytorch:2.2.0_cu121-python3.11-ubuntu20.04` | | Ubuntu 20.04 | Base | 2.2.0 | 12.1.0 (EFA) | 3.11 | `mosaicml/pytorch:2.2.0_cu121-python3.11-ubuntu20.04-aws` | | Ubuntu 20.04 | Base | 2.2.0 | cpu | 3.11 | `mosaicml/pytorch:2.2.0_cpu-python3.11-ubuntu20.04` | diff --git a/docker/build_matrix.yaml b/docker/build_matrix.yaml index 2dd849432c..e6c744847d 100644 --- a/docker/build_matrix.yaml +++ b/docker/build_matrix.yaml @@ -190,6 +190,33 @@ - mosaicml/pytorch:2.0.1_cpu-python3.10-ubuntu20.04 TARGET: pytorch_stage TORCHVISION_VERSION: 0.15.2 +- AWS_OFI_NCCL_VERSION: v1.7.4-aws + BASE_IMAGE: nvidia/cuda:12.1.0-cudnn8-devel-ubuntu20.04 + CUDA_VERSION: 12.1.0 + IMAGE_NAME: torch-nightly-2-3-0-20240110-cu121-python3-10 + MOFED_VERSION: '' + NVIDIA_REQUIRE_CUDA_OVERRIDE: cuda>=12.1 brand=tesla,driver>=450,driver<451 brand=tesla,driver>=470,driver<471 + brand=unknown,driver>=470,driver<471 brand=nvidia,driver>=470,driver<471 brand=nvidiartx,driver>=470,driver<471 + brand=geforce,driver>=470,driver<471 brand=geforcertx,driver>=470,driver<471 brand=quadro,driver>=470,driver<471 + brand=quadrortx,driver>=470,driver<471 brand=titan,driver>=470,driver<471 brand=titanrtx,driver>=470,driver<471 + brand=tesla,driver>=510,driver<511 brand=unknown,driver>=510,driver<511 brand=nvidia,driver>=510,driver<511 + brand=nvidiartx,driver>=510,driver<511 brand=geforce,driver>=510,driver<511 brand=geforcertx,driver>=510,driver<511 + brand=quadro,driver>=510,driver<511 brand=quadrortx,driver>=510,driver<511 brand=titan,driver>=510,driver<511 + brand=titanrtx,driver>=510,driver<511 brand=tesla,driver>=515,driver<516 brand=unknown,driver>=515,driver<516 + brand=nvidia,driver>=515,driver<516 brand=nvidiartx,driver>=515,driver<516 brand=geforce,driver>=515,driver<516 + brand=geforcertx,driver>=515,driver<516 brand=quadro,driver>=515,driver<516 brand=quadrortx,driver>=515,driver<516 + brand=titan,driver>=515,driver<516 brand=titanrtx,driver>=515,driver<516 brand=tesla,driver>=525,driver<526 + brand=unknown,driver>=525,driver<526 brand=nvidia,driver>=525,driver<526 brand=nvidiartx,driver>=525,driver<526 + brand=geforce,driver>=525,driver<526 brand=geforcertx,driver>=525,driver<526 brand=quadro,driver>=525,driver<526 + brand=quadrortx,driver>=525,driver<526 brand=titan,driver>=525,driver<526 brand=titanrtx,driver>=525,driver<526 + PYTHON_VERSION: '3.10' + PYTORCH_NIGHTLY_URL: https://download.pytorch.org/whl/nightly/cu121 + PYTORCH_NIGHTLY_VERSION: dev20240110+cu121 + PYTORCH_VERSION: 2.3.0 + TAGS: + - mosaicml/pytorch:2.3.0_cu121-nightly20240110-python3.10-ubuntu20.04 + TARGET: pytorch_stage + TORCHVISION_VERSION: 0.18.0 - AWS_OFI_NCCL_VERSION: '' BASE_IMAGE: nvidia/cuda:12.1.0-cudnn8-devel-ubuntu20.04 CUDA_VERSION: 12.1.0 diff --git a/docker/generate_build_matrix.py b/docker/generate_build_matrix.py index 3c85993b20..1d895cbf2e 100644 --- a/docker/generate_build_matrix.py +++ b/docker/generate_build_matrix.py @@ -224,6 +224,23 @@ def _main(): pytorch_entries.append(entry) + nightly_entry_310_aws = { + 'AWS_OFI_NCCL_VERSION': 'v1.7.4-aws', + 'BASE_IMAGE': 'nvidia/cuda:12.1.0-cudnn8-devel-ubuntu20.04', + 'CUDA_VERSION': '12.1.0', + 'IMAGE_NAME': 'torch-nightly-2-3-0-20240110-cu121-python3-10', + 'MOFED_VERSION': '', + 'NVIDIA_REQUIRE_CUDA_OVERRIDE': _get_cuda_override('12.1.0'), + 'PYTHON_VERSION': '3.10', + 'PYTORCH_VERSION': '2.3.0', + 'PYTORCH_NIGHTLY_URL': 'https://download.pytorch.org/whl/nightly/cu121', + 'PYTORCH_NIGHTLY_VERSION': 'dev20240110+cu121', + 'TAGS': ['mosaicml/pytorch:2.3.0_cu121-nightly20240110-python3.10-ubuntu20.04'], + 'TARGET': 'pytorch_stage', + 'TORCHVISION_VERSION': '0.18.0' + } + pytorch_entries.append(nightly_entry_310_aws) + nightly_entry_310 = { 'AWS_OFI_NCCL_VERSION': '', 'BASE_IMAGE': 'nvidia/cuda:12.1.0-cudnn8-devel-ubuntu20.04', From 4205425f91a801ac15370974fa99d54d5fb3a8ec Mon Sep 17 00:00:00 2001 From: Charles Tang Date: Tue, 13 Feb 2024 10:39:30 -0800 Subject: [PATCH 13/19] Patch torch 2.3 aws naming (#3006) * commit change * commit change --- docker/README.md | 2 +- docker/build_matrix.yaml | 4 ++-- docker/generate_build_matrix.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docker/README.md b/docker/README.md index 8ea2217bc9..c617567f2f 100644 --- a/docker/README.md +++ b/docker/README.md @@ -32,7 +32,7 @@ To install composer, once inside the image, run `pip install mosaicml`. |----------------|----------|-------------------|---------------------|------------------|------------------------------------------------------------------------------------------| | Ubuntu 20.04 | Base | 2.3.0 | 12.1.0 (Infiniband) | 3.11 | `mosaicml/pytorch:2.3.0_cu121-nightly20240110-python3.11-ubuntu20.04` | | Ubuntu 20.04 | Base | 2.3.0 | 12.1.0 (Infiniband) | 3.10 | `mosaicml/pytorch:2.3.0_cu121-nightly20240110-python3.10-ubuntu20.04` | -| Ubuntu 20.04 | Base | 2.3.0 | 12.1.0 (EFA) | 3.10 | `mosaicml/pytorch:2.3.0_cu121-nightly20240110-python3.10-ubuntu20.04` | +| Ubuntu 20.04 | Base | 2.3.0 | 12.1.0 (EFA) | 3.10 | `mosaicml/pytorch:2.3.0_cu121-nightly20240110-python3.10-ubuntu20.04-aws` | | Ubuntu 20.04 | Base | 2.2.0 | 12.1.0 (Infiniband) | 3.11 | `mosaicml/pytorch:2.2.0_cu121-python3.11-ubuntu20.04` | | Ubuntu 20.04 | Base | 2.2.0 | 12.1.0 (EFA) | 3.11 | `mosaicml/pytorch:2.2.0_cu121-python3.11-ubuntu20.04-aws` | | Ubuntu 20.04 | Base | 2.2.0 | cpu | 3.11 | `mosaicml/pytorch:2.2.0_cpu-python3.11-ubuntu20.04` | diff --git a/docker/build_matrix.yaml b/docker/build_matrix.yaml index e6c744847d..700bd4c010 100644 --- a/docker/build_matrix.yaml +++ b/docker/build_matrix.yaml @@ -193,7 +193,7 @@ - AWS_OFI_NCCL_VERSION: v1.7.4-aws BASE_IMAGE: nvidia/cuda:12.1.0-cudnn8-devel-ubuntu20.04 CUDA_VERSION: 12.1.0 - IMAGE_NAME: torch-nightly-2-3-0-20240110-cu121-python3-10 + IMAGE_NAME: torch-nightly-2-3-0-20240110-cu121-python3-10-aws MOFED_VERSION: '' NVIDIA_REQUIRE_CUDA_OVERRIDE: cuda>=12.1 brand=tesla,driver>=450,driver<451 brand=tesla,driver>=470,driver<471 brand=unknown,driver>=470,driver<471 brand=nvidia,driver>=470,driver<471 brand=nvidiartx,driver>=470,driver<471 @@ -214,7 +214,7 @@ PYTORCH_NIGHTLY_VERSION: dev20240110+cu121 PYTORCH_VERSION: 2.3.0 TAGS: - - mosaicml/pytorch:2.3.0_cu121-nightly20240110-python3.10-ubuntu20.04 + - mosaicml/pytorch:2.3.0_cu121-nightly20240110-python3.10-ubuntu20.04-aws TARGET: pytorch_stage TORCHVISION_VERSION: 0.18.0 - AWS_OFI_NCCL_VERSION: '' diff --git a/docker/generate_build_matrix.py b/docker/generate_build_matrix.py index 1d895cbf2e..333010304b 100644 --- a/docker/generate_build_matrix.py +++ b/docker/generate_build_matrix.py @@ -228,14 +228,14 @@ def _main(): 'AWS_OFI_NCCL_VERSION': 'v1.7.4-aws', 'BASE_IMAGE': 'nvidia/cuda:12.1.0-cudnn8-devel-ubuntu20.04', 'CUDA_VERSION': '12.1.0', - 'IMAGE_NAME': 'torch-nightly-2-3-0-20240110-cu121-python3-10', + 'IMAGE_NAME': 'torch-nightly-2-3-0-20240110-cu121-python3-10-aws', 'MOFED_VERSION': '', 'NVIDIA_REQUIRE_CUDA_OVERRIDE': _get_cuda_override('12.1.0'), 'PYTHON_VERSION': '3.10', 'PYTORCH_VERSION': '2.3.0', 'PYTORCH_NIGHTLY_URL': 'https://download.pytorch.org/whl/nightly/cu121', 'PYTORCH_NIGHTLY_VERSION': 'dev20240110+cu121', - 'TAGS': ['mosaicml/pytorch:2.3.0_cu121-nightly20240110-python3.10-ubuntu20.04'], + 'TAGS': ['mosaicml/pytorch:2.3.0_cu121-nightly20240110-python3.10-ubuntu20.04-aws'], 'TARGET': 'pytorch_stage', 'TORCHVISION_VERSION': '0.18.0' } From 953ee888e95b337046e0d2933374dd45258e7145 Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Tue, 13 Feb 2024 13:49:46 -0500 Subject: [PATCH 14/19] start training loop (#3005) --- composer/trainer/trainer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/composer/trainer/trainer.py b/composer/trainer/trainer.py index ebadb58944..b7c9bd4d4a 100644 --- a/composer/trainer/trainer.py +++ b/composer/trainer/trainer.py @@ -2086,6 +2086,7 @@ def _train_loop(self) -> None: # asserted to be not None when Trainer.fit() is called raise RuntimeError('max_duration must be specified when initializing the Trainer') + log.debug('Starting training loop') while self.state.timestamp < self.state.max_duration: if int(self.state.timestamp.batch_in_epoch) == 0: self.engine.run_event(Event.EPOCH_START) From 157af107def3b76a29597c8a7c93e74a30bfdd0b Mon Sep 17 00:00:00 2001 From: Charles Tang Date: Tue, 13 Feb 2024 11:56:26 -0800 Subject: [PATCH 15/19] Deprecate ffcv script (#3007) --- scripts/ffcv/create_ffcv_datasets.py | 148 --------------------------- 1 file changed, 148 deletions(-) delete mode 100644 scripts/ffcv/create_ffcv_datasets.py diff --git a/scripts/ffcv/create_ffcv_datasets.py b/scripts/ffcv/create_ffcv_datasets.py deleted file mode 100644 index 190974c762..0000000000 --- a/scripts/ffcv/create_ffcv_datasets.py +++ /dev/null @@ -1,148 +0,0 @@ -# Copyright 2022 MosaicML Composer authors -# SPDX-License-Identifier: Apache-2.0 - -"""Helper utilities to create FFCV datasets.""" - -import logging -import os -import sys -import textwrap -from argparse import ArgumentParser -from io import BytesIO -from typing import Tuple - -import numpy as np -import torch -from PIL import Image -from torch.utils.data import Subset -from torchvision import transforms -from torchvision.datasets import CIFAR10, ImageFolder -from tqdm import tqdm - -from composer.datasets.ffcv_utils import write_ffcv_dataset - -log = logging.getLogger(__name__) - - -def _get_parser(): - parser = ArgumentParser(description='Utility for converting datasets to ffcv format.') - - parser.add_argument('--dataset', - type=str, - default='cifar10', - choices=['cifar10', 'imagenet1k'], - help=textwrap.dedent("""\ - Dataset to use. Default: cifar10""")) - parser.add_argument('--remote', - type=str, - help=textwrap.dedent("""\ - Remote directory (S3 or local filesystem) where dataset is stored., Example: s3://my-s3-bucket-name""" - )) - parser.add_argument('--local', - type=str, - default=None, - help=textwrap.dedent("""\ - Local filesystem directory where dataset is cached during operation. Default: None""")) - parser.add_argument('--split', - type=str, - default='train', - choices=['train', 'val'], - help=textwrap.dedent("""\ - Split to use. Default: train""")) - - parser.add_argument('--datadir', - type=str, - default=None, - help=textwrap.dedent("""\ - Location of the dataset. Default: None""")) - - parser.add_argument('--download', - type=bool, - default=False, - help=textwrap.dedent("""\ - Download the dataset if possible. Default: False""")) - - parser.add_argument('--write_path', - type=str, - default=None, - help=textwrap.dedent("""\ - File path to use for writing the dataset. Default: /tmp/_.ffcv""")) - - parser.add_argument('--write_mode', - type=str, - default='proportion', - choices=['raw', 'jpg', 'smart', 'proportion'], - help=textwrap.dedent("""\ - Write mode to use. raw is uint8 values, jpg is jpeg compressed images, smart is - compressing based on image size and proportion is according to the given - compress_probability. Default: proportion""")) - - parser.add_argument('--max_resolution', type=int, default=500, help='Max resoultion for images.') - - parser.add_argument('--num_workers', type=int, default=64, help='Number of workers to use.') - - parser.add_argument('--chunk_size', type=int, default=100, help='Chunk size to use.') - - parser.add_argument('--jpeg_quality', type=int, default=90, help='Quality of jpeg.') - - parser.add_argument('--subset', type=int, default=-1, help='Only use a subset of dataset.') - - parser.add_argument('--compress_probability', - type=float, - required=False, - default=0.50, - help='Compress the given fraction of images to jpeg while writing the ffcv dataset.') - return parser - - -def _parse_args(): - parser = _get_parser() - - args = parser.parse_args() - - if args.datadir is not None: - log.info(f'Will read from local directory: {args.datadir}.') - else: - if args.local is None: - args.local = f'/tmp/mds-cache/mds-{args.dataset}/' - - if args.remote.startswith('s3://'): - log.info(f'Will read from remote: {args.remote}.') - else: - log.info(f'Will read from local: {args.remote}.') - - if args.write_path is None: - args.write_path = f'/tmp/{args.dataset}_{args.split}.ffcv' - - if os.path.exists(args.write_path): - log.error(f'Destination already exists: {args.write_path}') - sys.exit(-1) - - return args - - -def _main(): - args = _parse_args() - - if args.dataset == 'cifar10': - dataset = CIFAR10(root=args.datadir, train=(args.split == 'train'), download=args.download) - elif args.dataset == 'imagenet1k': - dataset = ImageFolder(os.path.join(args.datadir, args.split)) - else: - raise ValueError(f'Unsupported dataset: {args.dataset}. Checkout the list of supported datasets with -h') - - if args.subset > 0: - dataset = Subset(dataset, range(args.subset)) - - write_ffcv_dataset(dataset=dataset, - write_path=args.write_path, - max_resolution=args.max_resolution, - num_workers=args.num_workers, - write_mode=args.write_mode, - compress_probability=args.compress_probability, - jpeg_quality=args.jpeg_quality, - chunk_size=args.chunk_size) - - -if __name__ == '__main__': - sys.exit(_main()) From 30e6525ef4fe223fb387eb3ba517314115baabb0 Mon Sep 17 00:00:00 2001 From: Jesse Chan Date: Tue, 13 Feb 2024 14:01:41 -0800 Subject: [PATCH 16/19] Race condition fix in checkpoint loading util (#3001) * HSDP fix race condition * bug fix * clean up * update comments * Update composer/utils/checkpoint.py Co-authored-by: Mihir Patel * bug fix * linter * Update composer/utils/checkpoint.py Co-authored-by: Mihir Patel * Update composer/utils/checkpoint.py Co-authored-by: Mihir Patel * Update composer/utils/checkpoint.py Co-authored-by: Mihir Patel --------- Co-authored-by: Mihir Patel --- composer/utils/checkpoint.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/composer/utils/checkpoint.py b/composer/utils/checkpoint.py index ad814f84f4..a50a2db27d 100644 --- a/composer/utils/checkpoint.py +++ b/composer/utils/checkpoint.py @@ -217,17 +217,33 @@ def read_data(self, plan: LoadPlan, planner: LoadPlanner): first_replica = self.device_mesh is None or self.device_mesh.ndim == 1 or ( self.device_mesh.ndim >= 2 and self.device_mesh.get_local_rank(mesh_dim=0) == 0) - # 1. Download to the destination all files this rank needs if on first replica + # 1. Collect the relative paths to download for all ranks for deduplication + relative_file_paths = set() + for plan_item in plan.items: + relative_file_paths.add(self.storage_data[plan_item.storage_index].relative_path) + all_file_paths = dist.all_gather_object(relative_file_paths) + + # 2. Download to the destination all files this rank needs if on first replica if first_replica: log.debug(f'Rank {dist.get_global_rank()} starting to download files.') + + # Get the lowest rank in the current node + local_rank_0 = dist.get_global_rank() - dist.get_local_rank() + for plan_item in plan.items: - # Each plan item has a storage index which points to the relative path of the shard file at save time. relative_file_path = self.storage_data[plan_item.storage_index].relative_path + # Check if the file is scheduled to be downloaded by a lower rank on the same node + # i.e. if rank 0 and rank 1 on the same node have the same the same required file, + # only rank 0 should download it and not rank 1. + is_downloaded = any( + relative_file_path in all_file_paths[i] for i in range(local_rank_0, dist.get_global_rank())) + # Download the shard file to the relative path it's associated to and save that relative path # to the root directory specified to the FileSystem reader constructor. file_destination = str(Path(self.destination_path) / Path(relative_file_path)) + # The file could have already been downloaded as different plan items can point to same file. - if not os.path.exists(file_destination): + if not is_downloaded and not os.path.exists(file_destination): log.debug(f'Downloading {relative_file_path} to {file_destination}.') object_name = str(Path(self.source_path) / Path(relative_file_path)) if isinstance(self.object_store, ObjectStore): @@ -242,12 +258,12 @@ def read_data(self, plan: LoadPlan, planner: LoadPlanner): ) log.debug(f'Finished downloading {relative_file_path} to {file_destination}.') - # 2. Wait for all ranks to finish. + # 3. Wait for all ranks to finish. log.debug(f'Rank {dist.get_global_rank()} finished downloading all files.') dist.barrier() log.debug('Done waiting for all ranks to finish downloading files.') - # 3. Broadcast files to all other replicas if HSDP + # 4. Broadcast files to all other replicas if HSDP if self.device_mesh is not None and self.device_mesh.ndim == 2: # Broadcast file to all replicas replicate_process_group = self.device_mesh.get_group(0) @@ -288,7 +304,7 @@ def read_data(self, plan: LoadPlan, planner: LoadPlanner): f'Done waiting for all ranks to finish transferring files. Local checkpoint files: {os.listdir(self.destination_path)}' ) - # 4. Piggyback off of the FileSystemReader to read all the files now that they are downloaded. + # 5. Piggyback off of the FileSystemReader to read all the files now that they are downloaded. return super().read_data(plan, planner) From 83257c57602de49e3073145bb94ae2ac07d3acc9 Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Tue, 13 Feb 2024 17:17:42 -0500 Subject: [PATCH 17/19] remove log (#3008) --- composer/loggers/mosaicml_logger.py | 1 - 1 file changed, 1 deletion(-) diff --git a/composer/loggers/mosaicml_logger.py b/composer/loggers/mosaicml_logger.py index d4338a407a..cab710a4c3 100644 --- a/composer/loggers/mosaicml_logger.py +++ b/composer/loggers/mosaicml_logger.py @@ -162,7 +162,6 @@ def _flush_metadata(self, force_flush: bool = False, future: bool = True) -> Non self.buffered_metadata = {} self.time_last_logged = time.time() done, incomplete = wait(self._futures, timeout=0.01) - log.info(f'Logged {len(done)} metadata to MosaicML, waiting on {len(incomplete)}') # Raise any exceptions for f in done: if f.exception() is not None: From f9c485c5fd343a5d0411cf0d373410b088b271b8 Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Tue, 13 Feb 2024 19:57:10 -0500 Subject: [PATCH 18/19] Only save RNG on rank 0 (#2998) * v1 * save on rank 0 only * add back w deprecation * remove comment --- composer/callbacks/checkpoint_saver.py | 12 +++++----- composer/core/state.py | 9 ++++---- composer/trainer/trainer.py | 2 +- composer/utils/checkpoint.py | 32 ++++++++++++-------------- 4 files changed, 27 insertions(+), 28 deletions(-) diff --git a/composer/callbacks/checkpoint_saver.py b/composer/callbacks/checkpoint_saver.py index 6fa64715c8..0ce6f77d90 100644 --- a/composer/callbacks/checkpoint_saver.py +++ b/composer/callbacks/checkpoint_saver.py @@ -394,7 +394,7 @@ def _save_checkpoint(self, state: State, logger: Logger): return metadata_local_file_path = None - if dist.get_global_rank() == 0 and state.fsdp_elastic_sharded_enabled: + if dist.get_global_rank() == 0 and state.fsdp_sharded_state_dict_enabled: metadata_local_file_path = format_name_with_dist_and_time( os.path.join(Path(saved_path).parent, _TORCH_DISTRIBUTED_CHECKPOINTS_METADATA_FILENAME), state.run_name, state.timestamp) @@ -407,11 +407,11 @@ def _save_checkpoint(self, state: State, logger: Logger): except FileNotFoundError: pass # Sharded checkpoints for torch >2.0 use directories not files for load_paths - if state.fsdp_elastic_sharded_enabled: + if state.fsdp_sharded_state_dict_enabled: src_path = str(pathlib.Path(saved_path).parent) else: src_path = saved_path - this_rank_saves_symlinks = dist.get_global_rank() == 0 or not state.fsdp_elastic_sharded_enabled + this_rank_saves_symlinks = dist.get_global_rank() == 0 or not state.fsdp_sharded_state_dict_enabled if this_rank_saves_symlinks: os.symlink(os.path.relpath(src_path, os.path.dirname(symlink)), symlink) @@ -430,7 +430,7 @@ def _save_checkpoint(self, state: State, logger: Logger): remote_file_name = format_name_with_dist_and_time(remote_file_name, state.run_name, state.timestamp) # Upload metadata file. # The metadata file contains info related to which shards are saved where. - if dist.get_global_rank() == 0 and state.fsdp_elastic_sharded_enabled: + if dist.get_global_rank() == 0 and state.fsdp_sharded_state_dict_enabled: metadata_remote_file_name = format_name_with_dist_and_time( os.path.join(Path(remote_file_name).parent, _TORCH_DISTRIBUTED_CHECKPOINTS_METADATA_FILENAME), state.run_name, state.timestamp) @@ -463,12 +463,12 @@ def _save_checkpoint(self, state: State, logger: Logger): with tempfile.TemporaryDirectory() as tmpdir: symlink_filename = os.path.join(tmpdir, 'latest.symlink') # Sharded checkpoints for torch >2.0 use directories not files for load_paths - if state.fsdp_elastic_sharded_enabled: + if state.fsdp_sharded_state_dict_enabled: src_path = str(pathlib.Path(remote_file_name).parent) else: src_path = remote_file_name log.debug(f'Creating symlink file {symlink_filename} -> {src_path}') - this_rank_saves_symlinks = dist.get_global_rank() == 0 or not state.fsdp_elastic_sharded_enabled + this_rank_saves_symlinks = dist.get_global_rank() == 0 or not state.fsdp_sharded_state_dict_enabled if this_rank_saves_symlinks: create_symlink_file(src_path, symlink_filename) logger.upload_file( diff --git a/composer/core/state.py b/composer/core/state.py index dcb180f4b4..cc97cb8391 100644 --- a/composer/core/state.py +++ b/composer/core/state.py @@ -731,6 +731,11 @@ def fsdp_state_dict_type(self): def fsdp_sharded_state_dict_enabled(self): return self.fsdp_config is not None and self.fsdp_enabled and self.fsdp_state_dict_type == 'sharded' + @property + def fsdp_elastic_sharded_enabled(self): + warnings.warn('state.fsdp_elastic_sharded_enabled is deprecated and will be removed v0.21.0') + return self.fsdp_sharded_state_dict_enabled + @property def fsdp_device_mesh(self): if self.fsdp_enabled: @@ -745,10 +750,6 @@ def load_fsdp_monolith_rank0_only(self): return self.fsdp_config is not None and self.fsdp_auto_wrap and self.fsdp_config[ 'state_dict_type'] == 'full' and self.fsdp_config['load_monolith_rank0_only'] == True - @property - def fsdp_elastic_sharded_enabled(self): - return self.fsdp_sharded_state_dict_enabled - def _get_integrations_state_dict(self) -> Dict[str, Any]: """Gets a dictionary of information about integrations to store in the state dict. diff --git a/composer/trainer/trainer.py b/composer/trainer/trainer.py index b7c9bd4d4a..01cd0fcc9b 100644 --- a/composer/trainer/trainer.py +++ b/composer/trainer/trainer.py @@ -1429,7 +1429,7 @@ def __init__( 'Multiple concurrent uploads is not currently supported when using autoresume. Please set `num_concurrent_uploads` to 1 ' 'for all `RemoteUploaderDownloader` instances.') assert latest_remote_file_name is not None - if self.state.fsdp_elastic_sharded_enabled: + if self.state.fsdp_sharded_state_dict_enabled: ar_object_store = maybe_create_object_store_from_uri(save_folder) # Symlink is on object store. if ar_object_store is not None: diff --git a/composer/utils/checkpoint.py b/composer/utils/checkpoint.py index a50a2db27d..c1fe956192 100644 --- a/composer/utils/checkpoint.py +++ b/composer/utils/checkpoint.py @@ -464,13 +464,13 @@ def load_checkpoint( """ path = partial_format(path, run_name=state.run_name) using_legacy_sharded = False - if state.fsdp_elastic_sharded_enabled: + if state.fsdp_sharded_state_dict_enabled: assert object_store is None or isinstance( object_store, ObjectStore), 'For loading sharded checkpoints load_object_store must be set with the class ObjectStore' using_legacy_sharded = is_checkpoint_legacy_sharded(object_store, path) - if state.fsdp_elastic_sharded_enabled and not using_legacy_sharded: + if state.fsdp_sharded_state_dict_enabled and not using_legacy_sharded: rng_state_dicts = load_sharded_checkpoint( source_path=path, state=state, @@ -1013,16 +1013,18 @@ def _save_checkpoint( state_dict['state'] = state_dict.get('state', {}) if state.fsdp_sharded_state_dict_enabled: + # Only rank 0 saves RNG + if dist.get_global_rank() > 0: + state_dict.pop('rng') # To load optimizer states with 2.0 <= torch < 2.2.9 , the optimizer state must be at the top # level of the state dict because the load_sharded_optimizer_state_dict function # requires a top level state dict key for the optimizer. # See https://github.com/pytorch/pytorch/blob/v2.0.1/torch/distributed/checkpoint/optimizer.py#L271 # for more info. - if version.parse(torch.__version__) < version.parse('2.2.9'): - if not weights_only: - state_dict['optimizers'] = state_dict['state'].pop('optimizers') - log.debug('State dict created.') + if version.parse(torch.__version__) < version.parse('2.2.9') and not weights_only: + state_dict['optimizers'] = state_dict['state'].pop('optimizers') + log.debug('State dict created.') dirname = os.path.dirname(save_filename) if dirname: os.makedirs(dirname, exist_ok=True) @@ -1030,7 +1032,7 @@ def _save_checkpoint( # Only some ranks are meant to save checkpoint and produce a file expect_file = False - # All ranks save for deepspeed + # Save deepspeed checkpoint if is_deepspeed: expect_file = True log.debug('Saving deepspeed checkpoints to %s...', save_filename) @@ -1041,9 +1043,8 @@ def _save_checkpoint( _compress_file(save_filename, basename=_COMPOSER_STATES_FILENAME) _save_deepspeed_model(state.deepspeed_model, save_filename) - - # Sharded checkpointing - elif state.fsdp_elastic_sharded_enabled: + # Save sharded checkpoint + elif state.fsdp_sharded_state_dict_enabled: if state.fsdp_config is None: raise ValueError('Saving a sharded checkpoint requires passing an FSDP config to Trainer.') @@ -1075,24 +1076,21 @@ def _save_checkpoint( process_group=process_group, ) log.debug('Finished pytorch save state dict') - - # Only rank 0 saves the state_dict unless you are using sharded checkpointing with torch <2.0 - elif dist.get_global_rank() == 0 or state.fsdp_sharded_state_dict_enabled: + # Save monolith checkpoint + elif dist.get_global_rank() == 0: expect_file = True - log_msg = f'Saving sharded checkpoints to {save_filename}...' if state.fsdp_sharded_state_dict_enabled else f'Saving monolithic checkpoint to {save_filename}' with open(save_filename, 'wb') as f: - log.debug(log_msg) + log.debug(f'Saving monolithic checkpoint to {save_filename}') torch.save(state_dict, f) log.debug(f'Global rank 0 done saving checkpoint to disk at {save_filename}.') if is_tar(save_filename): _compress_file(save_filename, basename=_COMPOSER_STATES_FILENAME) - else: log.debug(f'Only rank 0 is saving a checkpoint, so rank {dist.get_global_rank()} skips checkpointing.') - dist.barrier() # ensure all ranks saved their files + dist.barrier() # Ensure all ranks saved their files if expect_file: assert os.path.exists(save_filename), 'Expected file to have been saved.' From 9e60fa33866bf26bddd66d9e9830bd23dca3c77d Mon Sep 17 00:00:00 2001 From: bigning Date: Wed, 14 Feb 2024 09:33:55 -0800 Subject: [PATCH 19/19] [EASY] Always log 1st batch when resuming training (#3009) * logging 1st batch regardless * comments --- composer/loggers/console_logger.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/composer/loggers/console_logger.py b/composer/loggers/console_logger.py index 4e6fc8a3d8..8cfd470b02 100644 --- a/composer/loggers/console_logger.py +++ b/composer/loggers/console_logger.py @@ -109,7 +109,7 @@ def epoch_end(self, state: State, logger: Logger) -> None: cur_epoch = int(state.timestamp.epoch) # epoch gets incremented right before EPOCH_END unit = self.log_interval.unit - if unit == TimeUnit.EPOCH and (cur_epoch % int(self.log_interval) == 0 or cur_epoch == 1): + if unit == TimeUnit.EPOCH and (cur_epoch % int(self.log_interval) == 0 or self.last_logged_batch == 0): self.log_to_console(self.logged_metrics, prefix='Train ', state=state) self.last_logged_batch = int(state.timestamp.batch) self.logged_metrics = {} # Clear logged metrics. @@ -117,7 +117,7 @@ def epoch_end(self, state: State, logger: Logger) -> None: def batch_end(self, state: State, logger: Logger) -> None: cur_batch = int(state.timestamp.batch) unit = self.log_interval.unit - if unit == TimeUnit.BATCH and (cur_batch % int(self.log_interval) == 0 or cur_batch == 1): + if unit == TimeUnit.BATCH and (cur_batch % int(self.log_interval) == 0 or self.last_logged_batch == 0): self.log_to_console(self.logged_metrics, prefix='Train ', state=state) self.last_logged_batch = cur_batch self.logged_metrics = {} # Clear logged metrics.