Skip to content

Commit

Permalink
Merge branch 'mvpatel2000/fix-cpu-tests' of github.com-mvpatel2000:mv…
Browse files Browse the repository at this point in the history
…patel2000/composer into mvpatel2000/fix-cpu-tests
  • Loading branch information
mvpatel2000 committed Sep 23, 2024
2 parents 7726818 + ed1efa3 commit 146ebae
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 138 deletions.
18 changes: 9 additions & 9 deletions .github/workflows/daily.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ jobs:
pytest_command: coverage run -m pytest
composer_package_name: mosaicml
- name: cpu-3.11-2.4
container: mosaicml/pytorch:2.4.0_cpu-python3.11-ubuntu20.04
container: mosaicml/pytorch:2.4.1_cpu-python3.11-ubuntu20.04
markers: not daily and (remote or not remote) and not gpu and not doctest
pytest_command: coverage run -m pytest
composer_package_name: mosaicml
- name: cpu-3.11-2.4-composer
container: mosaicml/pytorch:2.4.0_cpu-python3.11-ubuntu20.04
container: mosaicml/pytorch:2.4.1_cpu-python3.11-ubuntu20.04
markers: not daily and (remote or not remote) and not gpu and not doctest
pytest_command: coverage run -m pytest
composer_package_name: composer
- name: cpu-doctest
container: mosaicml/pytorch:2.4.0_cpu-python3.11-ubuntu20.04
container: mosaicml/pytorch:2.4.1_cpu-python3.11-ubuntu20.04
markers: not daily and (remote or not remote) and not gpu and doctest
pytest_command: coverage run -m pytest tests/test_docs.py
composer_package_name: mosaicml
Expand All @@ -56,17 +56,17 @@ jobs:
pytest_command: coverage run -m pytest
composer_package_name: mosaicml
- name: daily-cpu-3.11-2.4
container: mosaicml/pytorch:2.4.0_cpu-python3.11-ubuntu20.04
container: mosaicml/pytorch:2.4.1_cpu-python3.11-ubuntu20.04
markers: daily and (remote or not remote) and not gpu and not doctest
pytest_command: coverage run -m pytest
composer_package_name: mosaicml
- name: daily-cpu-3.11-2.4-composer
container: mosaicml/pytorch:2.4.0_cpu-python3.11-ubuntu20.04
container: mosaicml/pytorch:2.4.1_cpu-python3.11-ubuntu20.04
markers: daily and (remote or not remote) and not gpu and not doctest
pytest_command: coverage run -m pytest
composer_package_name: composer
- name: daily-cpu-doctest
container: mosaicml/pytorch:2.4.0_cpu-python3.11-ubuntu20.04
container: mosaicml/pytorch:2.4.1_cpu-python3.11-ubuntu20.04
markers: daily and (remote or not remote) and not gpu and doctest
pytest_command: coverage run -m pytest tests/test_docs.py
composer_package_name: mosaicml
Expand Down Expand Up @@ -120,7 +120,7 @@ jobs:
composer_package_name: "mosaicml"
gpu_num: 1
- name: "gpu-3.11-2.4-1-gpu"
container: mosaicml/pytorch:2.4.0_cu124-python3.11-ubuntu20.04
container: mosaicml/pytorch:2.4.1_cu124-python3.11-ubuntu20.04
markers: "(daily or not daily) and (remote or not remote) and gpu and (doctest or not doctest)"
pytest_command: "coverage run -m pytest"
composer_package_name: "mosaicml"
Expand All @@ -138,7 +138,7 @@ jobs:
composer_package_name: "mosaicml"
gpu_num: 2
- name: "gpu-3.11-2.4-2-gpu"
container: mosaicml/pytorch:2.4.0_cu124-python3.11-ubuntu20.04
container: mosaicml/pytorch:2.4.1_cu124-python3.11-ubuntu20.04
markers: "(daily or not daily) and (remote or not remote) and gpu and (doctest or not doctest)"
pytest_command: "coverage run -m pytest"
composer_package_name: "mosaicml"
Expand All @@ -156,7 +156,7 @@ jobs:
composer_package_name: "mosaicml"
gpu_num: 4
- name: "gpu-3.11-2.4-4-gpu"
container: mosaicml/pytorch:2.4.0_cu124-python3.11-ubuntu20.04
container: mosaicml/pytorch:2.4.1_cu124-python3.11-ubuntu20.04
markers: "(daily or not daily) and (remote or not remote) and gpu and (doctest or not doctest)"
pytest_command: "coverage run -m pytest"
composer_package_name: "mosaicml"
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pr-cpu.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ jobs:
markers: not daily and not remote and not gpu and not doctest
pytest_command: coverage run -m pytest
- name: cpu-3.11-2.4
container: mosaicml/pytorch:2.4.0_cpu-python3.11-ubuntu20.04
container: mosaicml/pytorch:2.4.1_cpu-python3.11-ubuntu20.04
markers: not daily and not remote and not gpu and not doctest
pytest_command: coverage run -m pytest
- name: cpu-doctest
container: mosaicml/pytorch:2.4.0_cpu-python3.11-ubuntu20.04
container: mosaicml/pytorch:2.4.1_cpu-python3.11-ubuntu20.04
markers: not daily and not remote and not gpu and doctest
pytest_command: coverage run -m pytest tests/test_docs.py
steps:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/pr-gpu.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
matrix:
include:
- name: gpu-3.11-2.4-1
container: mosaicml/pytorch:2.4.0_cu124-python3.11-ubuntu20.04
container: mosaicml/pytorch:2.4.1_cu124-python3.11-ubuntu20.04
markers: not daily and not remote and gpu and (doctest or not doctest)
pytest_command: coverage run -m pytest
composer_package_name: mosaicml
Expand Down Expand Up @@ -45,7 +45,7 @@ jobs:
matrix:
include:
- name: gpu-3.11-2.4-2
container: mosaicml/pytorch:2.4.0_cu124-python3.11-ubuntu20.04
container: mosaicml/pytorch:2.4.1_cu124-python3.11-ubuntu20.04
markers: not daily and not remote and gpu and (doctest or not doctest)
pytest_command: coverage run -m pytest
composer_package_name: mosaicml
Expand Down Expand Up @@ -75,7 +75,7 @@ jobs:
matrix:
include:
- name: gpu-3.11-2.4-4
container: mosaicml/pytorch:2.4.0_cu124-python3.11-ubuntu20.04
container: mosaicml/pytorch:2.4.1_cu124-python3.11-ubuntu20.04
markers: not daily and not remote and gpu and (doctest or not doctest)
pytest_command: coverage run -m pytest
composer_package_name: mosaicml
Expand Down
22 changes: 10 additions & 12 deletions composer/loggers/mlflow_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ def __init__(

if log_system_metrics:
# Set system metrics sampling interval and samples before logging so that system metrics
# are collected every 5s, and aggregated over 3 samples before being logged
# (logging per 15s).
mlflow.set_system_metrics_samples_before_logging(3)
# are collected every 5s, and aggregated over 6 samples before being logged
# (logging per 30s).
mlflow.set_system_metrics_samples_before_logging(6)
mlflow.set_system_metrics_sampling_interval(5)

self._rank_zero_only = rank_zero_only
Expand Down Expand Up @@ -312,10 +312,7 @@ def init(self, state: State, logger: Logger) -> None:
if self.run_name is None:
self.run_name = state.run_name

if hasattr(state, 'device'):
self._global_exception_occurred = state.device.tensor_to_device(torch.tensor([0], dtype=torch.uint8),)
else:
self._global_exception_occurred = 0
self._global_exception_occurred = 0

# Store the Composer run name in the MLFlow run tags so it can be retrieved for autoresume
self.tags['run_name'] = os.environ.get('RUN_NAME', state.run_name)
Expand Down Expand Up @@ -545,7 +542,11 @@ def register_model_with_run_id(
"""
if self._enabled:
from mlflow.exceptions import MlflowException
from mlflow.protos.databricks_pb2 import ALREADY_EXISTS, RESOURCE_ALREADY_EXISTS, ErrorCode
from mlflow.protos.databricks_pb2 import (
ALREADY_EXISTS,
RESOURCE_ALREADY_EXISTS,
ErrorCode,
)

full_name = f'{self.model_registry_prefix}.{name}' if len(self.model_registry_prefix) > 0 else name

Expand Down Expand Up @@ -611,10 +612,7 @@ def post_close(self):
if hasattr(self, 'monitor_process'):
# Check if there is an uncaught exception, which means `post_close()` is triggered
# due to program crash.
if isinstance(self._global_exception_occurred, torch.Tensor):
finish_with_exception = (self._global_exception_occurred == 1).item()
else:
finish_with_exception = (self._global_exception_occurred == 1)
finish_with_exception = self._global_exception_occurred == 1
if finish_with_exception:
self.monitor_process.crash()
return
Expand Down
94 changes: 49 additions & 45 deletions composer/trainer/_patch_pytorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,51 +946,7 @@ def unshard_with_sync(self):
if version.parse(torch.__version__) >= version.parse('2.4.0') and version.parse(
torch.__version__,
) < version.parse('2.4.1'):
# Save original FlatParamHandle.unshard to revert back to when dropping automicrobatching hooks
from torch.distributed.fsdp._flat_param import FlatParamHandle
original_unshard = FlatParamHandle.unshard

@no_type_check
def unshard_with_sync(self):
"""Run the unshard logic, but with a sync after a :meth:`_alloc_padded_unsharded_flat_param`.
This prevents deadlocks when some ranks OOM after the alloc call and others do not.
This is a patched method from pytorch, meant to be called when automicrobatching
turns on hooks in its search process for the optimal non-OOMing microbatch size.
This includes all-gathering the flat parameter
and switching to using the unsharded flat parameter. If the handle does
not need unsharding, then this only switches to using the unsharded
flat parameter. For ``NO_SHARD``, this is a no-op.
If FSDP is in :meth:`summon_full_params` and the handle uses parameter
mixed precision, then the parameter is forced to full precision.
"""
if not self.needs_unshard():
# Even when not needing an unshard, we should switch to using
# the unsharded flat parameter
unsharded_flat_param = (
self._get_padded_unsharded_flat_param()
if self.uses_sharded_strategy
else self.flat_param
)
self._use_unsharded_flat_param(unsharded_flat_param)
return
unsharded_flat_param = self._alloc_padded_unsharded_flat_param()

# Check if any other rank hit an OOM
found_cuda_oom_tensor = torch.tensor([0], dtype=torch.uint8).to(self.device, non_blocking=True)

dist.all_reduce(found_cuda_oom_tensor, reduce_operation='MAX')
found_cuda_oom = found_cuda_oom_tensor.item()
# Signal current rank is still in batch
all_ranks_finished_tensor = torch.tensor([0], dtype=torch.uint8).to(self.device, non_blocking=True)

dist.all_reduce(all_ranks_finished_tensor, reduce_operation='MIN')

if found_cuda_oom == 1:
raise RuntimeError('CUDA out of memory encountered on a different rank')
padded_unsharded_flat_param = self._all_gather_flat_param(unsharded_flat_param)
self._use_unsharded_flat_param(padded_unsharded_flat_param)

# 2.4.0 only patch
# PyTorch issue: https://github.com/pytorch/pytorch/issues/133923
from torch.distributed.checkpoint.metadata import STATE_DICT_TYPE
from typing import Mapping, Collection
Expand Down Expand Up @@ -1046,3 +1002,51 @@ def _traverse_obj(path: OBJ_PATH, value: STATE_DICT_ITEM) -> None:

for key, value in state_dict.items():
_traverse_obj((str(key),), value)

if version.parse(torch.__version__) >= version.parse('2.4.0') and version.parse(
torch.__version__,
) < version.parse('2.4.2'):
# Save original FlatParamHandle.unshard to revert back to when dropping automicrobatching hooks
from torch.distributed.fsdp._flat_param import FlatParamHandle
original_unshard = FlatParamHandle.unshard

@no_type_check
def unshard_with_sync(self):
"""Run the unshard logic, but with a sync after a :meth:`_alloc_padded_unsharded_flat_param`.
This prevents deadlocks when some ranks OOM after the alloc call and others do not.
This is a patched method from pytorch, meant to be called when automicrobatching
turns on hooks in its search process for the optimal non-OOMing microbatch size.
This includes all-gathering the flat parameter
and switching to using the unsharded flat parameter. If the handle does
not need unsharding, then this only switches to using the unsharded
flat parameter. For ``NO_SHARD``, this is a no-op.
If FSDP is in :meth:`summon_full_params` and the handle uses parameter
mixed precision, then the parameter is forced to full precision.
"""
if not self.needs_unshard():
# Even when not needing an unshard, we should switch to using
# the unsharded flat parameter
unsharded_flat_param = (
self._get_padded_unsharded_flat_param()
if self.uses_sharded_strategy
else self.flat_param
)
self._use_unsharded_flat_param(unsharded_flat_param)
return
unsharded_flat_param = self._alloc_padded_unsharded_flat_param()

# Check if any other rank hit an OOM
found_cuda_oom_tensor = torch.tensor([0], dtype=torch.uint8).to(self.device, non_blocking=True)

dist.all_reduce(found_cuda_oom_tensor, reduce_operation='MAX')
found_cuda_oom = found_cuda_oom_tensor.item()
# Signal current rank is still in batch
all_ranks_finished_tensor = torch.tensor([0], dtype=torch.uint8).to(self.device, non_blocking=True)

dist.all_reduce(all_ranks_finished_tensor, reduce_operation='MIN')

if found_cuda_oom == 1:
raise RuntimeError('CUDA out of memory encountered on a different rank')
padded_unsharded_flat_param = self._all_gather_flat_param(unsharded_flat_param)
self._use_unsharded_flat_param(padded_unsharded_flat_param)
87 changes: 55 additions & 32 deletions composer/utils/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,25 @@ def _get_write_mode(name: str) -> str:
raise ValueError(f'{name} does not end with a valid tarfile extension.')


def _is_rng_key(key: str, value: tuple) -> bool:
"""Check if the key is an RNG key.
We expect the RNG key to be of the form 'rng.{rank}.cuda|torch|python|numpy'.
This function ensures that we don't accidentally pick up other keys.
"""
starts_with_rng = key.startswith('rng')
ends_with_expected = key.endswith(('cuda', 'torch', 'python', 'numpy'))
three_parts = isinstance(value, tuple) and len(value) == 3
if starts_with_rng and ends_with_expected and three_parts:
return True

return False


def _get_num_ranks_that_saved_rng(metadata: Metadata):
rng_inds = []
for field_name, field_value in metadata.planner_data.items():
if 'rng' in field_name:
if _is_rng_key(field_name, field_value):
_, rng_rank_index, _ = field_value
rng_inds.append(rng_rank_index)
rng_inds = set(rng_inds)
Expand Down Expand Up @@ -608,42 +623,50 @@ def dist_cp_load(
load_planner: Optional[LoadPlanner] = None,
):
if version.parse(torch.__version__) >= version.parse('2.4.0'):
from torch.distributed.checkpoint.utils import CheckpointException
try:
dist_cp.load(
state_dict=state_dict,
storage_reader=storage_reader,
planner=load_planner,
)
except CheckpointException as e:
checkpoint_metadata = storage_reader.read_metadata().state_dict_metadata
if 'state.metadata' in checkpoint_metadata and 'state.metadata.composer_env_info.composer_version' not in checkpoint_metadata:
# Torch 2.4 changed the way how state dict is flattened. It broke backward compatibility.
# Torch issue: https://github.com/pytorch/pytorch/issues/133923.
# We override the traverse_state_dict so that the load planner could
# use the old way of flattening the state dict
log.debug('Trying to load checkpointing saved before torch 2.4')

import torch.distributed.checkpoint._nested_dict as nested_dict
import torch.distributed.checkpoint._sharded_tensor_utils as sharded_tensor_util
from torch.distributed.checkpoint._traverse import traverse_state_dict as traverse_2_4_0

from composer.trainer._patch_pytorch import traverse_state_dict as backward_compatible_traverse

nested_dict.traverse_state_dict = backward_compatible_traverse
sharded_tensor_util.traverse_state_dict = backward_compatible_traverse

if version.parse(torch.__version__) < version.parse('2.4.1'):
# PyTorch 2.4.0
from torch.distributed.checkpoint.utils import CheckpointException
try:
dist_cp.load(
state_dict=state_dict,
storage_reader=storage_reader,
planner=load_planner,
)
# Revert the override
nested_dict.traverse_state_dict = traverse_2_4_0
sharded_tensor_util.traverse_state_dict = traverse_2_4_0
else:
raise e

except CheckpointException as e:
checkpoint_metadata = storage_reader.read_metadata().state_dict_metadata
if 'state.metadata' in checkpoint_metadata and 'state.metadata.composer_env_info.composer_version' not in checkpoint_metadata:
# Torch 2.4 changed the way how state dict is flattened. It broke backward compatibility.
# Torch issue: https://github.com/pytorch/pytorch/issues/133923.
# We override the traverse_state_dict so that the load planner could
# use the old way of flattening the state dict
log.debug('Trying to load checkpointing saved before torch 2.4')

import torch.distributed.checkpoint._nested_dict as nested_dict
import torch.distributed.checkpoint._sharded_tensor_utils as sharded_tensor_util
from torch.distributed.checkpoint._traverse import traverse_state_dict as traverse_2_4_0

from composer.trainer._patch_pytorch import traverse_state_dict as backward_compatible_traverse

nested_dict.traverse_state_dict = backward_compatible_traverse
sharded_tensor_util.traverse_state_dict = backward_compatible_traverse

dist_cp.load(
state_dict=state_dict,
storage_reader=storage_reader,
planner=load_planner,
)
# Revert the override
nested_dict.traverse_state_dict = traverse_2_4_0
sharded_tensor_util.traverse_state_dict = traverse_2_4_0
else:
raise e
else:
# PyTorch 2.4.1
dist_cp.load(
state_dict=state_dict,
storage_reader=storage_reader,
planner=load_planner,
)
else:
dist_cp.load_state_dict(
state_dict=state_dict,
Expand Down
5 changes: 4 additions & 1 deletion composer/utils/remote_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ def upload_file(retry_index: int = 0):
# Good! It shouldn't exist.
pass
else:
raise FileExistsError(f'Object {remote_file_name} already exists, but overwrite was set to False.')
raise FileExistsError(
f'Object {remote_file_name} already exists, but overwrite was set to False. '
'Please set `save_overwrite` to `True` in Trainer to overwrite the existing file.',
)
log.info(f'Uploading file {local_file_path} to {remote_file_name}')
object_store.upload_object(
object_name=remote_file_name,
Expand Down
Loading

0 comments on commit 146ebae

Please sign in to comment.