From 04edabbb379c71d82850ea52cbeee5e59f33b9e0 Mon Sep 17 00:00:00 2001 From: Gal Rotem Date: Thu, 14 Dec 2023 20:20:14 -0800 Subject: [PATCH] add decorators for UTs: gpu and distributed available (#655) Summary: Syntactic sugar instead of copy-pasting the `.skipUnless` with the description across all tests Pull Request resolved: https://github.com/pytorch/tnt/pull/655 Reviewed By: JKSenthil Differential Revision: D52169103 fbshipit-source-id: 4f7ea8a3a821fda9e405358eb5a029e0bf786139 --- tests/framework/test_auto_unit.py | 52 ++++++++--------------------- tests/utils/test_distributed_gpu.py | 23 +++++-------- tests/utils/test_timer.py | 16 ++++----- torchtnt/utils/test_utils.py | 13 ++++++++ 4 files changed, 42 insertions(+), 62 deletions(-) diff --git a/tests/framework/test_auto_unit.py b/tests/framework/test_auto_unit.py index b657f2ea17..838656f276 100644 --- a/tests/framework/test_auto_unit.py +++ b/tests/framework/test_auto_unit.py @@ -12,6 +12,7 @@ import torch from torch.distributed.fsdp.sharded_grad_scaler import ShardedGradScaler from torchtnt.framework.auto_unit import TrainStepResults +from torchtnt.utils.test_utils import skip_if_not_distributed, skip_if_not_gpu from torchtnt.utils.version import is_torch_version_geq_1_13 @@ -57,9 +58,6 @@ class TestAutoUnit(unittest.TestCase): - cuda_available: bool = torch.cuda.is_available() - distributed_available: bool = torch.distributed.is_available() - def test_app_state_mixin(self) -> None: """ Test that app_state, tracked_optimizers, tracked_lr_schedulers are set as expected with AutoUnit @@ -81,12 +79,8 @@ def test_app_state_mixin(self) -> None: for key in ("module", "optimizer", "lr_scheduler", "grad_scaler"): self.assertIn(key, auto_unit.app_state()) - @unittest.skipUnless( - condition=distributed_available, reason="Torch distributed is needed to run" - ) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu + @skip_if_not_distributed def test_fsdp_fp16(self) -> None: """ Test that FSDP + FP16 uses ShardedGradScaler @@ -154,9 +148,7 @@ def test_lr_scheduler_epoch(self) -> None: train(auto_unit, train_dataloader=train_dl, max_epochs=max_epochs) self.assertEqual(auto_unit.lr_scheduler.step.call_count, max_epochs) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu @patch("torch.autocast") def test_mixed_precision_fp16(self, mock_autocast: MagicMock) -> None: """ @@ -177,9 +169,7 @@ def test_mixed_precision_fp16(self, mock_autocast: MagicMock) -> None: device_type="cuda", dtype=torch.float16, enabled=True ) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu @patch("torch.autocast") def test_mixed_precision_bf16(self, mock_autocast: MagicMock) -> None: """ @@ -318,12 +308,8 @@ def test_stochastic_weight_averaging_update_freq(self) -> None: # 1 warmup + epoch 2 + epoch 3 = 2 self.assertEqual(update_swa_mock.call_count, 2) - @unittest.skipUnless( - condition=distributed_available, reason="Torch distributed is needed to run" - ) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_distributed + @skip_if_not_gpu def test_stochastic_weight_averaging_fsdp(self) -> None: """ Test that swa params with FSDP is identical to non-FSDP swa @@ -399,9 +385,7 @@ def forward(self, x): for p1, p2 in zip(swa_params, swa_fsdp_params, strict=True): torch.testing.assert_close(p2, p1, check_device=False) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu @patch("torch.autocast") def test_eval_mixed_precision_bf16(self, mock_autocast: MagicMock) -> None: """ @@ -423,12 +407,8 @@ def test_eval_mixed_precision_bf16(self, mock_autocast: MagicMock) -> None: device_type="cuda", dtype=torch.bfloat16, enabled=True ) - @unittest.skipUnless( - condition=distributed_available, reason="Torch distributed is needed to run" - ) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu + @skip_if_not_distributed def test_no_sync(self) -> None: """ Test that the no_sync autocast context is correctly applied when using gradient accumulation @@ -571,9 +551,7 @@ def test_configure_optimizers_and_lr_scheduler_called_once(self) -> None: ) self.assertEqual(configure_optimizers_and_lr_scheduler_mock.call_count, 1) - @unittest.skipUnless( - condition=distributed_available, reason="Torch distributed is needed to run" - ) + @skip_if_not_distributed def test_auto_unit_ddp(self) -> None: """ Launch tests of AutoUnit with DDP strategy @@ -766,9 +744,7 @@ def test_auto_unit_timing_predict(self) -> None: timer=Timer(), ) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu @patch("torch.autocast") def test_predict_mixed_precision_fp16(self, mock_autocast: MagicMock) -> None: """ @@ -793,9 +769,7 @@ def test_predict_mixed_precision_fp16(self, mock_autocast: MagicMock) -> None: condition=COMPILE_AVAIL, reason="This test needs PyTorch 1.13 or greater to run.", ) - @unittest.skipUnless( - condition=cuda_available, reason="This test needs a GPU host to run." - ) + @skip_if_not_gpu @patch("torch.compile") def test_compile_predict(self, mock_dynamo: MagicMock) -> None: """ diff --git a/tests/utils/test_distributed_gpu.py b/tests/utils/test_distributed_gpu.py index fe6df63f58..a57348e5bf 100644 --- a/tests/utils/test_distributed_gpu.py +++ b/tests/utils/test_distributed_gpu.py @@ -12,18 +12,16 @@ from torchtnt.utils.device import get_device_from_env from torchtnt.utils.distributed import all_gather_tensors, get_local_rank, PGWrapper from torchtnt.utils.env import init_from_env -from torchtnt.utils.test_utils import spawn_multi_process +from torchtnt.utils.test_utils import ( + skip_if_not_distributed, + skip_if_not_gpu, + spawn_multi_process, +) class DistributedGPUTest(unittest.TestCase): - dist_available: bool = torch.distributed.is_available() - cuda_available: bool = torch.cuda.is_available() - - @unittest.skipUnless( - condition=cuda_available, - reason="This test should only run on a GPU host.", - ) - @unittest.skipUnless(dist_available, reason="Torch distributed is needed to run") + @skip_if_not_gpu + @skip_if_not_distributed def test_gather_uneven_multidim_nccl(self) -> None: spawn_multi_process( 2, @@ -43,11 +41,8 @@ def _test_ddp_gather_uneven_tensors_multidim_nccl() -> None: assert val.shape == (idx + 1, 4 - idx) assert (val == 1).all() - @unittest.skipUnless( - condition=cuda_available, - reason="This test should only run on a GPU host.", - ) - @unittest.skipUnless(dist_available, reason="Torch distributed is needed to run") + @skip_if_not_gpu + @skip_if_not_distributed def test_pg_wrapper_scatter_object_list_nccl(self) -> None: spawn_multi_process( 2, diff --git a/tests/utils/test_timer.py b/tests/utils/test_timer.py index 0fdeaf0ae2..bf27ae87f3 100644 --- a/tests/utils/test_timer.py +++ b/tests/utils/test_timer.py @@ -16,7 +16,11 @@ import torch import torch.distributed as dist from pyre_extensions import none_throws -from torchtnt.utils.test_utils import spawn_multi_process +from torchtnt.utils.test_utils import ( + skip_if_not_distributed, + skip_if_not_gpu, + spawn_multi_process, +) from torchtnt.utils.timer import ( BoundedTimer, FullSyncPeriodicTimer, @@ -100,10 +104,7 @@ def test_timer_context_manager(self, _) -> None: timer.recorded_durations["action_4"][0], intervals[2] ) - @unittest.skipUnless( - condition=bool(torch.cuda.is_available()), - reason="This test needs a GPU host to run.", - ) + @skip_if_not_gpu @patch("torch.cuda.synchronize") def test_timer_synchronize(self, mock_synchornize: Mock) -> None: """Make sure that torch.cuda.synchronize() is called when GPU is present.""" @@ -230,10 +231,7 @@ def _get_synced_durations_histogram_multi_process() -> None: tc = unittest.TestCase() tc.assertEqual(durations, expected_durations) - @unittest.skipUnless( - condition=bool(dist.is_available()), - reason="This test should only run if torch.distributed is available.", - ) + @skip_if_not_distributed def test_get_synced_durations_histogram_multi_process(self) -> None: spawn_multi_process( 2, "gloo", self._get_synced_durations_histogram_multi_process diff --git a/torchtnt/utils/test_utils.py b/torchtnt/utils/test_utils.py index 7802f8f7ea..7bcc9ab1a3 100644 --- a/torchtnt/utils/test_utils.py +++ b/torchtnt/utils/test_utils.py @@ -16,6 +16,8 @@ from io import StringIO from typing import Any, Callable, Dict, Generator, Optional, TextIO, Tuple, TypeVar +import torch + import torch.distributed.launcher as pet from pyre_extensions import ParameterSpecification from torch import distributed as dist, multiprocessing @@ -149,3 +151,14 @@ def captured_output() -> Generator[Tuple[TextIO, TextIO], None, None]: yield sys.stdout, sys.stderr finally: sys.stdout, sys.stderr = old_out, old_err + + +"""Decorator for tests to ensure running on a GPU.""" +skip_if_not_gpu: Callable[..., Callable[..., object]] = unittest.skipUnless( + torch.cuda.is_available(), "Skipping test since GPU is not available" +) + +"""Decorator for tests to ensure running when distributed is available.""" +skip_if_not_distributed: Callable[..., Callable[..., object]] = unittest.skipUnless( + torch.distributed.is_available(), "Skipping test since distributed is not available" +)