From 33fd4630d1d4dbb9ac2e5d7f72014637ba49d7e3 Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Wed, 7 Feb 2024 13:23:53 -0500 Subject: [PATCH 01/16] add 2.2 tests (#2970) --- .github/workflows/daily.yaml | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/.github/workflows/daily.yaml b/.github/workflows/daily.yaml index 02238a51fa6..25f686cf897 100644 --- a/.github/workflows/daily.yaml +++ b/.github/workflows/daily.yaml @@ -33,6 +33,11 @@ jobs: markers: not daily and (remote or not remote) and not gpu and not doctest pytest_command: coverage run -m pytest composer_package_name: composer + - name: cpu-3.11-2.2 + container: mosaicml/pytorch:2.2.0_cpu-python3.11-ubuntu20.04 + markers: not daily and (remote or not remote) and not gpu and not doctest + pytest_command: coverage run -m pytest + composer_package_name: mosaicml - name: cpu-doctest container: mosaicml/pytorch:2.1.2_cpu-python3.10-ubuntu20.04 markers: not daily and (remote or not remote) and not gpu and doctest @@ -53,6 +58,11 @@ jobs: markers: daily and (remote or not remote) and not gpu and not doctest pytest_command: coverage run -m pytest composer_package_name: composer + - name: daily-cpu-3.11-2.2 + container: mosaicml/pytorch:2.2.0_cpu-python3.11-ubuntu20.04 + markers: daily and (remote or not remote) and not gpu and not doctest + pytest_command: coverage run -m pytest + composer_package_name: mosaicml - name: daily-cpu-doctest container: mosaicml/pytorch:2.1.2_cpu-python3.10-ubuntu20.04 markers: daily and (remote or not remote) and not gpu and doctest @@ -100,7 +110,12 @@ jobs: pytest_command: "coverage run -m pytest" composer_package_name: "mosaicml" - name: "gpu-3.10-2.1" - container: mosaicml/pytorch:2.1.0_cu121-python3.10-ubuntu20.04 + container: mosaicml/pytorch:2.1.2_cu121-python3.10-ubuntu20.04 + markers: "(daily or not daily) and (remote or not remote) and gpu and (doctest or not doctest)" + pytest_command: "coverage run -m pytest" + composer_package_name: "mosaicml" + - name: "gpu-3.10-2.2" + container: mosaicml/pytorch:2.2.0_cu121-python3.10-ubuntu20.04 markers: "(daily or not daily) and (remote or not remote) and gpu and (doctest or not doctest)" pytest_command: "coverage run -m pytest" composer_package_name: "mosaicml" From 7e6b7751e5fd53e935f8c6cf930de2812e7e0e4b Mon Sep 17 00:00:00 2001 From: Cheng Li Date: Wed, 7 Feb 2024 11:56:56 -0800 Subject: [PATCH 02/16] Memory snapshot dump pickle (#2968) * dump pickle in memorysnapshot * fix test * fix test --- composer/callbacks/memory_snapshot.py | 53 ++++++++++++++----------- tests/callbacks/test_memory_snapshot.py | 2 +- tests/callbacks/test_oom_observer.py | 2 +- 3 files changed, 32 insertions(+), 25 deletions(-) diff --git a/composer/callbacks/memory_snapshot.py b/composer/callbacks/memory_snapshot.py index ca60219c303..5fc724eaeea 100644 --- a/composer/callbacks/memory_snapshot.py +++ b/composer/callbacks/memory_snapshot.py @@ -4,6 +4,7 @@ """Log memory snapshot during training.""" import logging import os +import pickle import warnings from typing import Optional, Union @@ -50,10 +51,10 @@ class MemorySnapshot(Callback): max_entries (int, optional): Maximum number of memory alloc/free events to record. Defaults to 100000. folder (str, optional): A format string describing the folder containing the memory snapshot files. Defaults to ``'{{run_name}}/torch_traces'``. - filename (str, optional): A format string describing how to name the memory snapshot files. - Defaults to ``'rank{{rank}}.{{batch}}.pickle'``. - remote_file_name (str, optional): A format string for the memory snapshot remote file name. - Defaults to ``'{{run_name}}/torch_traces/rank{{rank}}.{{batch}}.pickle'``. + filename (str, optional): A format string describing the prefix used to name the memory snapshot files. + Defaults to ``'rank{{rank}}.{{batch}}.memory_snapshot'``. + remote_file_name (str, optional): A format string describing the prefix for the memory snapshot remote file name. + Defaults to ``'{{run_name}}/torch_traces/rank{{rank}}.{{batch}}.memory_snapshot'``. Whenever a trace file is saved, it is also uploaded as a file according to this format string. The same format variables as for ``filename`` are available. @@ -74,9 +75,8 @@ def __init__( interval: Union[int, str, Time] = '3ba', max_entries: int = 100000, folder: str = '{run_name}/torch_traces', - filename: str = 'rank{rank}.{batch}.pt.trace.memory_snapshot.html', - remote_file_name: Optional[ - str] = '{run_name}/torch_memory_traces/rank{rank}.{batch}.pt.trace.memory_snapshot.html', + filename: str = 'rank{rank}.{batch}.memory_snapshot', + remote_file_name: Optional[str] = '{run_name}/torch_memory_traces/rank{rank}.{batch}.memory_snapshot', overwrite: bool = False, ) -> None: self.batches_left_to_skip = skip_batches @@ -157,26 +157,33 @@ def export_memory_snapshot(self, state: State, logger: Logger) -> None: self.folder_name, format_name_with_dist_and_time(self.filename, run_name=state.run_name, timestamp=state.timestamp)) try: - log.info(f'Saving memory snapshot to local file: {filename}') + snapshot_file = filename + '.pickle' + trace_plot_file = filename + '.html' + log.info(f'Saving memory snapshot files') + snapshot = torch.cuda.memory._snapshot() # No data was recorded - avoids a `ValueError` in `trace_plot` if all(len(t) == 0 for t in snapshot['device_traces']): log.info(f'No allocation is recorded in memory snapshot)') return - with open(filename, 'w+') as fd: - fd.write(torch.cuda._memory_viz.trace_plot(snapshot, device=None, plot_segments=False)) # type: ignore + + with open(snapshot_file, 'wb') as fd: + pickle.dump(snapshot, fd) + + with open(trace_plot_file, 'w+') as fd: + fd.write(torch.cuda._memory_viz.trace_plot(snapshot)) # type: ignore + + log.info(f'Saved memory snapshot to local files with prefix = {filename}') + + if self.remote_path_in_bucket is not None: + for f in [snapshot_file, trace_plot_file]: + remote_file_name = (self.remote_path_in_bucket + os.path.basename(f)).lstrip('/') + log.info(f'Uploading memory snapshot to remote: {remote_file_name} from {f}') + try: + logger.upload_file(remote_file_name=remote_file_name, file_path=f, overwrite=self.overwrite) + except FileExistsError as e: + raise FileExistsError( + f'Uploading memory snapshot failed with error: {e}. overwrite was set to {self.overwrite}. To overwrite memory snapshot with Trainer, set `overwrite` to True.' + ) from e except Exception as e: log.error(f'Failed to capture memory snapshot {e}') - return - if self.remote_path_in_bucket is not None: - remote_file_name = format_name_with_dist_and_time(self.remote_path_in_bucket, - run_name=state.run_name, - timestamp=state.timestamp) - remote_file_name = remote_file_name.lstrip('/') - log.info(f'Uploading memory snapshot to remote: {remote_file_name} from {filename}') - try: - logger.upload_file(remote_file_name=remote_file_name, file_path=filename, overwrite=self.overwrite) - except FileExistsError as e: - raise FileExistsError( - f'Uploading memory snapshot failed with error: {e}. overwrite was set to {self.overwrite}. To overwrite memory snapshot with Trainer, set save_overwrite to True.' - ) from e diff --git a/tests/callbacks/test_memory_snapshot.py b/tests/callbacks/test_memory_snapshot.py index 1aee11c2f6c..0bafbcb1c13 100644 --- a/tests/callbacks/test_memory_snapshot.py +++ b/tests/callbacks/test_memory_snapshot.py @@ -58,5 +58,5 @@ def test_memory_snapshot(interval: str): max_duration='2ba', ) trainer.fit() - assert len(file_tracker_destination.uploaded_files) == 1 + assert len(file_tracker_destination.uploaded_files) == 2 trainer.close() diff --git a/tests/callbacks/test_oom_observer.py b/tests/callbacks/test_oom_observer.py index 5fbb5bd8a31..60323b00c03 100644 --- a/tests/callbacks/test_oom_observer.py +++ b/tests/callbacks/test_oom_observer.py @@ -85,4 +85,4 @@ def test_oom_observer_with_memory_snapshot(): ) trainer.fit() - assert len(file_tracker_destination.uploaded_files) == 1 + assert len(file_tracker_destination.uploaded_files) == 2 From a48f9fa19d4a86c65b661b0c6c6555982559d281 Mon Sep 17 00:00:00 2001 From: AleksanderWWW Date: Wed, 7 Feb 2024 21:06:52 +0100 Subject: [PATCH 03/16] Neptune logger (#2447) * draft neptune logger * correct setup file * add upload_file method * implement download_file method * misc fixes * add basic test cases * add test for uploading and deleting file * docstring update * Apply suggestions from code review Co-authored-by: Siddhant Sadangi * name improvement + minor refactor * code review * fix logging big ints * update neptune reqs * no neptune imports outside of logger methods * Update composer/loggers/neptune_logger.py Co-authored-by: Mihir Patel * update tests * Update composer/loggers/neptune_logger.py Co-authored-by: Siddhant Sadangi * Docstrings Co-authored-by: Sabine * code review 1 * code review 2 * set neptune run to None after stopping * Update composer/loggers/neptune_logger.py Co-authored-by: Sabine * validate images and channels * code review + tests * remove example * docs augmented * Update docs/source/trainer/file_uploading.rst Co-authored-by: Sabine * code review 1 * more fixes * try to fix pre-commit * small refactor * fix doctest * docstrings adjustment * remove step from metrics logging * try to resolve step duplication * call original epoch_end method * don't import in vain * handle duplicated step numbers by adding epoch number * code review 1 (to be continued...) * code review 2 (to be continued...) * walrus operator (composer min py version is 3.8) * code review 3 * docs and readme update * extract part of sanitization logic to seperate method * image visualizer docstring * Update composer/loggers/neptune_logger.py Co-authored-by: Sabine * fix * fix typo ind ocs * remove example, creds fail * fix * patch key * remove doctest * restore * maybe a fix * fix * remove doctest * fix * debug mode * fix --------- Co-authored-by: Siddhant Sadangi Co-authored-by: Mihir Patel Co-authored-by: Sabine --- .gitignore | 3 + README.md | 2 +- composer/callbacks/image_visualizer.py | 2 +- composer/loggers/__init__.py | 2 + composer/loggers/neptune_logger.py | 307 +++++++++++++++++++ docs/source/doctest_fixtures.py | 7 + docs/source/getting_started/installation.rst | 1 + docs/source/trainer/file_uploading.rst | 9 +- docs/source/trainer/logging.rst | 5 +- setup.py | 4 + tests/callbacks/callback_settings.py | 15 +- tests/loggers/test_neptune_logger.py | 149 +++++++++ 12 files changed, 498 insertions(+), 8 deletions(-) create mode 100644 composer/loggers/neptune_logger.py create mode 100644 tests/loggers/test_neptune_logger.py diff --git a/.gitignore b/.gitignore index 9b66bf52db2..789c75183bd 100644 --- a/.gitignore +++ b/.gitignore @@ -136,6 +136,9 @@ venv/ # WandB wandb/ +# Neptune +.neptune/ + # Spacemacs ._#* .#* diff --git a/README.md b/README.md index 8bdda2d3e0f..9ab992be3a7 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,7 @@ Composer is built to automate away low-level pain points and headaches so you ca Integrate with the tools you know and love for experiment tracking and data streaming. - **Cloud integrations**: Our Checkpointing and logging features have first-class support for remote storage and loading from Cloud bucket (OCI, GCP, AWS S3). -- **********Experiment tracking:********** Weights and Biases, MLFlow, and CometML — the choice is yours, easily log your data to your favorite platform. +- **********Experiment tracking:********** Weights and Biases, MLFlow, CometML, and neptune.ai — the choice is yours, easily log your data to your favorite platform. # **🚀 Getting Started** diff --git a/composer/callbacks/image_visualizer.py b/composer/callbacks/image_visualizer.py index c1a9379665f..ccc080058ca 100644 --- a/composer/callbacks/image_visualizer.py +++ b/composer/callbacks/image_visualizer.py @@ -46,7 +46,7 @@ class ImageVisualizer(Callback): +---------------------------------------------+---------------------------------------+ .. note:: - This callback only works with wandb logging for now. + This callback only works with wandb and Neptune logging for now. Args: interval (int | str | Time, optional): Time string specifying how often to log train images. For example, ``interval='1ep'`` diff --git a/composer/loggers/__init__.py b/composer/loggers/__init__.py index b46039fbe35..d95ca05c362 100644 --- a/composer/loggers/__init__.py +++ b/composer/loggers/__init__.py @@ -20,6 +20,7 @@ from composer.loggers.logger_destination import LoggerDestination from composer.loggers.mlflow_logger import MLFlowLogger from composer.loggers.mosaicml_logger import MosaicMLLogger +from composer.loggers.neptune_logger import NeptuneLogger from composer.loggers.progress_bar_logger import ProgressBarLogger from composer.loggers.remote_uploader_downloader import RemoteUploaderDownloader from composer.loggers.slack_logger import SlackLogger @@ -32,6 +33,7 @@ 'LoggerDestination', 'FileLogger', 'InMemoryLogger', + 'NeptuneLogger', 'ProgressBarLogger', 'WandBLogger', 'RemoteUploaderDownloader', diff --git a/composer/loggers/neptune_logger.py b/composer/loggers/neptune_logger.py new file mode 100644 index 00000000000..c1718a5c099 --- /dev/null +++ b/composer/loggers/neptune_logger.py @@ -0,0 +1,307 @@ +# Copyright 2022 MosaicML Composer authors +# SPDX-License-Identifier: Apache-2.0 + +"""Log training metadata to [neptune.ai](https://neptune.ai/).""" + +__all__ = ['NeptuneLogger'] + +import os +import pathlib +import warnings +from functools import partial +from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Set, Union + +import numpy as np +import torch + +from composer._version import __version__ +from composer.loggers import LoggerDestination +from composer.utils import MissingConditionalImportError, dist + +if TYPE_CHECKING: + from composer import Logger + from composer.core import State + + +class NeptuneLogger(LoggerDestination): + """Log to `neptune.ai `_. + + For more, see the [Neptune-Composer integration guide](https://docs.neptune.ai/integrations/composer/). + + Args: + project (str, optional): The name of your Neptune project, + in the form "workspace-name/project-name". If you leave it empty, the + ``NEPTUNE_PROJECT`` environment variable will be used. + api_token (str, optional): Your Neptune API token. + You can leave out this argument if you save your token to the + ``NEPTUNE_API_TOKEN`` environment variable (recommended). + You can find your API token in the user menu of the Neptune web app. + rank_zero_only (bool, optional): Whether to log only on the rank-zero process. + (default: ``True``). + upload_artifacts (bool, optional): Whether the logger should upload artifacts to Neptune. + (default: ``False``). + base_namespace (str, optional): The name of the base namespace to log the metadata to. + (default: "training"). + neptune_kwargs (Dict[str, Any], optional): Any additional keyword arguments to the + ``neptune.init_run()`` function. For options, see the + `Run API reference `_ in the + Neptune docs. + """ + metric_namespace = 'metrics' + hyperparam_namespace = 'hyperparameters' + trace_namespace = 'traces' + integration_version_key = 'source_code/integrations/neptune-MosaicML' + + def __init__( + self, + *, + project: Optional[str] = None, + api_token: Optional[str] = None, + rank_zero_only: bool = True, + upload_artifacts: bool = False, + base_namespace: str = 'training', + **neptune_kwargs, + ) -> None: + try: + from neptune.internal.utils import verify_type + except ImportError as e: + raise MissingConditionalImportError(extra_deps_group='neptune', + conda_package='neptune', + conda_channel='conda-forge') from e + + verify_type('project', project, (str, type(None))) + verify_type('api_token', api_token, (str, type(None))) + verify_type('rank_zero_only', rank_zero_only, bool) + verify_type('upload_artifacts', upload_artifacts, bool) + verify_type('base_namespace', base_namespace, str) + + if not base_namespace: + raise ValueError("Argument 'base_namespace' cannot be an empty string.") + + self._project = project + self._api_token = api_token + self._rank_zero_only = rank_zero_only + self._upload_artifacts = upload_artifacts + self._base_namespace = base_namespace + self._neptune_kwargs = neptune_kwargs + + mode = self._neptune_kwargs.pop('mode', 'async') + + self._enabled = (not rank_zero_only) or dist.get_global_rank() == 0 + + self._mode = mode if self._enabled else 'debug' + + self._neptune_run = None + self._base_handler = None + + self._metrics_dict: Dict[str, int] = {} # used to prevent duplicate step logging + + super().__init__() + + @property + def neptune_run(self): + """Gets the Neptune run object from a NeptuneLogger instance. + + You can log additional metadata to the run by accessing a path inside the run and assigning metadata to it + with "=" or [Neptune logging methods](https://docs.neptune.ai/logging/methods/). + + Example: + from composer import Trainer + from composer.loggers import NeptuneLogger + neptune_logger = NeptuneLogger() + trainer = Trainer(loggers=neptune_logger, ...) + trainer.fit() + neptune_logger.neptune_run["some_metric"] = 1 + trainer.close() + """ + from neptune import Run + + if not self._neptune_run: + self._neptune_run = Run( + project=self._project, + api_token=self._api_token, + mode=self._mode, + **self._neptune_kwargs, + ) + return self._neptune_run + + @property + def base_handler(self): + """Gets a handler for the base logging namespace. + + Use the handler to log extra metadata to the run and organize it under the base namespace (default: "training"). + You can operate on it like a run object: Access a path inside the handler and assign metadata to it with "=" or + other [Neptune logging methods](https://docs.neptune.ai/logging/methods/). + + Example: + from composer import Trainer + from composer.loggers import NeptuneLogger + neptune_logger = NeptuneLogger() + trainer = Trainer(loggers=neptune_logger, ...) + trainer.fit() + neptune_logger.base_handler["some_metric"] = 1 + trainer.close() + Result: The value `1` is organized under "training/some_metric" inside the run. + """ + return self.neptune_run[self._base_namespace] + + def init(self, state: 'State', logger: 'Logger') -> None: + del logger # unused + + self.base_handler['rank'] = dist.get_global_rank() + + if self._enabled: + self.neptune_run['sys/name'] = state.run_name + self.neptune_run[self.integration_version_key] = __version__ + + def _sanitize_metrics(self, metrics: Dict[str, float], step: Optional[int]) -> Dict[str, float]: + """Sanitize metrics to prevent duplicate step logging. + + Args: + metrics (Dict[str, float]): Metrics to log. + step (Optional[int]): Step to log metrics at. + + Returns: + Dict[str, float]: Sanitized metrics. + """ + keys_to_delete: Set[str] = set() + + for k in metrics: + self._process_single_metric(k, step, keys_to_delete) + + return dict(filter(lambda x: x[0] not in keys_to_delete, metrics.items())) + + def _process_single_metric(self, metric_key: str, step: Optional[int], keys_to_delete: Set[str]) -> None: + if metric_key not in self._metrics_dict: + self._metrics_dict[metric_key] = step if step is not None else 0 + else: + if step is not None: + if step <= self._metrics_dict[metric_key]: + # we cannot insert metrics earlier than or in place of an existing metric point + keys_to_delete.add(metric_key) + else: + self._metrics_dict[metric_key] = step + else: + self._metrics_dict[metric_key] += 1 + + def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None) -> None: + if not self._enabled: + return + + from neptune.utils import stringify_unsupported + + if metrics_to_log := self._sanitize_metrics(metrics, step): + self.base_handler[NeptuneLogger.metric_namespace].append(stringify_unsupported(metrics_to_log), step=step) + + def log_hyperparameters(self, hyperparameters: Dict[str, Any]) -> None: + if not self._enabled: + return + + from neptune.utils import stringify_unsupported + + self.base_handler[NeptuneLogger.hyperparam_namespace] = stringify_unsupported(hyperparameters) + + def log_traces(self, traces: Dict[str, Any]): + if not self._enabled: + return + + from neptune.utils import stringify_unsupported + + self.base_handler[NeptuneLogger.trace_namespace] = stringify_unsupported(traces) + + def can_upload_files(self) -> bool: + """Whether the logger supports uploading files.""" + return self._enabled and self._upload_artifacts + + def upload_file( + self, + state: 'State', + remote_file_name: str, + file_path: pathlib.Path, + *, + overwrite: bool = False, + ): + if not self.can_upload_files(): + return + + neptune_path = f'{self._base_namespace}/{remote_file_name}' + if self.neptune_run.exists(neptune_path) and not overwrite: + + warnings.warn(f"The file '{neptune_path}' already exists and overwrite is set to False." + 'No action will be taken.') + return + + del state # unused + self.base_handler[remote_file_name].upload(str(file_path)) + + def download_file( + self, + remote_file_name: str, + destination: str, + overwrite: bool = False, + progress_bar: bool = True, + ): + del progress_bar # not supported + + if not self._enabled: + return + + if os.path.exists(os.path.join( + destination, + remote_file_name, + )) and not overwrite: + warnings.warn(f"Destination '{destination}' already exists and overwrite is set to False." + 'No action will be taken.') + return + + file_path = f'{self._base_namespace}/{remote_file_name}' + if not self.neptune_run.exists(file_path): + raise FileNotFoundError(f'File {file_path} not found') + + self.base_handler[remote_file_name].download(destination=destination) + + def log_images( + self, + images: Union[np.ndarray, torch.Tensor, Sequence[Union[np.ndarray, torch.Tensor]]], + name: str = 'Images', + channels_last: bool = False, + step: Optional[int] = None, + masks: Optional[Dict[str, Union[np.ndarray, torch.Tensor, Sequence[Union[np.ndarray, torch.Tensor]]]]] = None, + mask_class_labels: Optional[Dict[int, str]] = None, + use_table: bool = True, + ): + if not self._enabled: + return + + from neptune.types import File + + if not isinstance(images, Sequence) and images.ndim <= 3: + images = _validate_image(images, channels_last=channels_last) + self.base_handler[name].append(File.as_image(images), step=step) + + else: + images = list(map(partial(_validate_image, channels_last=channels_last), images)) + self.base_handler[name].extend([File.as_image(img) for img in images]) + + def post_close(self) -> None: + if not self._enabled: + return + + if self._neptune_run: + self._neptune_run.stop() + self._neptune_run = None + + +def _validate_image(img: Union[np.ndarray, torch.Tensor], channels_last: bool) -> np.ndarray: + img_numpy = img.data.cpu().numpy() if isinstance(img, torch.Tensor) else img + + assert isinstance(img_numpy, np.ndarray) + + # Error out for empty arrays or weird arrays of dimension 0. + if np.any(np.equal(img_numpy.shape, 0)): + raise ValueError(f'Got an image (shape {img_numpy.shape}) with at least one dimension being 0! ') + + if not channels_last: + img_numpy = np.moveaxis(img_numpy, 0, -1) + + return img_numpy diff --git a/docs/source/doctest_fixtures.py b/docs/source/doctest_fixtures.py index 89d068efe2b..91b7c909b8d 100644 --- a/docs/source/doctest_fixtures.py +++ b/docs/source/doctest_fixtures.py @@ -71,6 +71,13 @@ except ImportError: _COMETML_INSTALLED = False +try: + import neptune + _NEPTUNE_INSTALLED = True + del neptune # unused +except ImportError: + _NEPTUNE_INSTALLED = False + try: import libcloud _LIBCLOUD_INSTALLED = True diff --git a/docs/source/getting_started/installation.rst b/docs/source/getting_started/installation.rst index b2cebc52812..100247983af 100644 --- a/docs/source/getting_started/installation.rst +++ b/docs/source/getting_started/installation.rst @@ -22,6 +22,7 @@ the following installation targets are available: * ``pip install 'mosaicml[nlp]'``: Installs Composer with support for NLP models and algorithms. * ``pip install 'mosaicml[wandb]'``: Installs Composer with support for :mod:`wandb`. * ``pip install 'mosaicml[comet_ml]'``: Installs Composer with support for :mod:`comet_ml`. +* ``pip install 'mosaicml[neptune]'``: Installs Composer with support for :mod:`neptune`. * ``pip install 'mosaicml[tensorboard]'``: Installs Composer with support for :mod:`tensorboard`. * ``pip install 'mosaicml[streaming]'``: Installs Composer with support for `streaming `_. * ``pip install 'mosaicml[mlflow]'``: Installs Composer with support for :mod:`mlflow`. diff --git a/docs/source/trainer/file_uploading.rst b/docs/source/trainer/file_uploading.rst index 9286db2d2a1..b6224a9bd7c 100644 --- a/docs/source/trainer/file_uploading.rst +++ b/docs/source/trainer/file_uploading.rst @@ -88,7 +88,8 @@ To store files remotely, in the ``loggers`` argument to the Trainer constructor, .. seealso:: - The built-in :class:`~composer.loggers.wandb_logger.WandBLogger` and + The built-in :class:`~composer.loggers.wandb_logger.WandBLogger`, + :class:`~composer.loggers.neptune_logger.NeptuneLogger and :class:`~composer.loggers.remote_uploader_downloader.RemoteUploaderDownloader` implement this method -- see the examples below. @@ -103,12 +104,16 @@ to upload them. Otherwise, you could run into an infinite loop! Where can I remotely store files? --------------------------------- -Composer includes two built-in LoggerDestinations to store artifacts: +Composer includes three built-in LoggerDestinations to store artifacts: * The :class:`~composer.loggers.wandb_logger.WandBLogger` can upload Composer training files as `W & B Artifacts `_, which are associated with the corresponding W & B project. +* The :class:`~composer.logger.neptune_logger.NeptuneLogger` can upload Composer training files + as `Neptune Files `_, which are associated with the corresponding + Neptune project. + * The :class:`~composer.loggers.remote_uploader_downloader.RemoteUploaderDownloader` can upload Composer training files to any cloud storage backend or remote filesystem. We include integrations for AWS S3 and SFTP (see the :ref:`examples ` below), and you can write your own integration for a custom backend. diff --git a/docs/source/trainer/logging.rst b/docs/source/trainer/logging.rst index 7458f89ff6c..b44f919016f 100644 --- a/docs/source/trainer/logging.rst +++ b/docs/source/trainer/logging.rst @@ -24,7 +24,7 @@ arguments in :class:`.Trainer`, like the following code, which will log metrics To attach other loggers, use the ``loggers`` argument. For example, the below logs the results to `Weights and -Biases `__, `MLflow `__, and `CometML `__, +Biases `__, `MLflow `__, `CometML `__, and `neptune.ai `__, and also saves them to the file ``log.txt``. @@ -41,7 +41,7 @@ and also saves them to the file :skipif: not _WANDB_INSTALLED or not _COMETML_INSTALLED from composer import Trainer - from composer.loggers import WandBLogger, CometMLLogger, MLFlowLogger, FileLogger + from composer.loggers import WandBLogger, CometMLLogger, MLFlowLogger, NeptuneLogger, FileLogger wandb_logger = WandBLogger() cometml_logger = CometMLLogger() @@ -73,6 +73,7 @@ Available Loggers ~wandb_logger.WandBLogger ~mlflow_logger.MLFlowLogger ~cometml_logger.CometMLLogger + ~neptune_logger.NeptuneLogger ~progress_bar_logger.ProgressBarLogger ~tensorboard_logger.TensorboardLogger ~in_memory_logger.InMemoryLogger diff --git a/setup.py b/setup.py index c87feaf05e2..c17b3014910 100644 --- a/setup.py +++ b/setup.py @@ -163,6 +163,10 @@ def package_files(prefix: str, directory: str, extension: str): 'comet_ml>=3.31.12,<4.0.0', ] +extra_deps['neptune'] = [ + 'neptune>=1.6.2,<2.0.0', +] + extra_deps['tensorboard'] = [ 'tensorboard>=2.9.1,<3.0.0', ] diff --git a/tests/callbacks/callback_settings.py b/tests/callbacks/callback_settings.py index 55ad1c641c9..22a4a09b582 100644 --- a/tests/callbacks/callback_settings.py +++ b/tests/callbacks/callback_settings.py @@ -14,8 +14,8 @@ from composer.callbacks import (EarlyStopper, ExportForInferenceCallback, FreeOutputs, Generate, ImageVisualizer, MemoryMonitor, MemorySnapshot, MLPerfCallback, OOMObserver, SpeedMonitor, SystemMetricsMonitor, ThresholdStopper) -from composer.loggers import (CometMLLogger, ConsoleLogger, LoggerDestination, MLFlowLogger, ProgressBarLogger, - RemoteUploaderDownloader, TensorboardLogger, WandBLogger) +from composer.loggers import (CometMLLogger, ConsoleLogger, LoggerDestination, MLFlowLogger, NeptuneLogger, + ProgressBarLogger, RemoteUploaderDownloader, TensorboardLogger, WandBLogger) from composer.models.base import ComposerModel from composer.utils import dist from composer.utils.device import get_device @@ -76,6 +76,13 @@ except ImportError: _PYNMVL_INSTALLED = False +try: + import neptune + _NEPTUNE_INSTALLED = True + del neptune # unused +except ImportError: + _NEPTUNE_INSTALLED = False + _callback_kwargs: Dict[Type[Callback], Dict[str, Any],] = { Generate: { 'prompts': ['a', 'b', 'c'], @@ -115,6 +122,9 @@ SpeedMonitor: { 'window_size': 1, }, + NeptuneLogger: { + 'mode': 'debug', + }, } _callback_marks: Dict[Type[Callback], List[pytest.MarkDecorator],] = { @@ -153,6 +163,7 @@ ImageVisualizer: [pytest.mark.skipif(not _WANDB_INSTALLED, reason='Wandb is optional')], MLFlowLogger: [pytest.mark.skipif(not _MLFLOW_INSTALLED, reason='mlflow is optional'),], SystemMetricsMonitor: [pytest.mark.skipif(not _PYNMVL_INSTALLED, reason='pynmvl is optional'),], + NeptuneLogger: [pytest.mark.skipif(not _NEPTUNE_INSTALLED, reason='neptune is optional'),], } diff --git a/tests/loggers/test_neptune_logger.py b/tests/loggers/test_neptune_logger.py new file mode 100644 index 00000000000..4463595c0ff --- /dev/null +++ b/tests/loggers/test_neptune_logger.py @@ -0,0 +1,149 @@ +# Copyright 2022 MosaicML Composer authors +# SPDX-License-Identifier: Apache-2.0 +import os +import uuid +from pathlib import Path +from typing import Sequence +from unittest.mock import MagicMock, patch + +import pytest +import torch +from torch.utils.data import DataLoader + +from composer import Trainer +from composer._version import __version__ +from composer.loggers import NeptuneLogger +from composer.utils import dist +from tests.common import RandomImageDataset, SimpleConvModel +from tests.common.markers import device + + +@pytest.fixture +def test_neptune_logger() -> NeptuneLogger: + neptune_project = 'test_project' + neptune_api_token = 'test_token' + + neptune_logger = NeptuneLogger( + project=neptune_project, + api_token=neptune_api_token, + rank_zero_only=False, + mode='debug', + upload_artifacts=True, + ) + + return neptune_logger + + +def test_neptune_init(test_neptune_logger): + mock_state = MagicMock() + mock_state.run_name = 'dummy-run-name' # should appear in sys/tags + + test_neptune_logger.init(state=mock_state, logger=MagicMock()) + + assert test_neptune_logger.neptune_run is not None + + test_neptune_logger.neptune_run.sync() + assert test_neptune_logger.neptune_run[NeptuneLogger.integration_version_key].fetch() == __version__ + assert test_neptune_logger.neptune_run['sys/name'].fetch() == 'dummy-run-name' + assert test_neptune_logger.base_handler['rank'].fetch() == 0 + + +@device('cpu') +def test_neptune_logging(device, test_neptune_logger): + + dataset_size = 64 + batch_size = 4 + num_batches = 4 + eval_interval = '1ba' + + trainer = Trainer(model=SimpleConvModel(), + loggers=test_neptune_logger, + train_dataloader=DataLoader(RandomImageDataset(size=dataset_size), batch_size), + eval_dataloader=DataLoader(RandomImageDataset(size=dataset_size), batch_size), + max_duration=f'{num_batches}ba', + eval_interval=eval_interval, + device=device) + trainer.fit() + + assert test_neptune_logger.neptune_run is not None + assert test_neptune_logger.base_handler is not None + + for metric_name in [ + 'metrics/train/MulticlassAccuracy', 'metrics/eval/MulticlassAccuracy', 'metrics/eval/CrossEntropy', + 'loss/train/total' + ]: + path = f'{test_neptune_logger._base_namespace}/{test_neptune_logger.metric_namespace}/{metric_name}' + assert test_neptune_logger.neptune_run.exists(path) + + for hyperparam_name in ['node_name', 'num_cpus_per_node', 'num_nodes', 'rank_zero_seed']: + path = f'{test_neptune_logger._base_namespace}/{test_neptune_logger.hyperparam_namespace}/{hyperparam_name}' + assert test_neptune_logger.neptune_run.exists(path) + + assert test_neptune_logger.base_handler['hyperparameters/num_nodes'].fetch() == 1 + + +@pytest.mark.gpu +@pytest.mark.world_size(1, 2) +def test_upload_and_download_file(test_neptune_logger, tmp_path, dummy_state): + neptune_artifact_name = 'test-neptune-artifact-' + str(uuid.uuid4()) + tmp_paths = dist.all_gather_object(os.path.abspath(tmp_path)) + save_folder = Path(tmp_paths[0]) + file_content = 'hello from Neptune!' + + dummy_neptune_artifact_path = save_folder / 'neptune_artifact.txt' + if dist.get_global_rank() == 0: + with open(dummy_neptune_artifact_path, 'w+') as f: + f.write(file_content) + + test_neptune_logger.upload_file(state=dummy_state, + file_path=dummy_neptune_artifact_path, + remote_file_name=neptune_artifact_name) + + dist.barrier() + + assert test_neptune_logger.neptune_run.exists(f'{test_neptune_logger._base_namespace}/{neptune_artifact_name}') + + dst_path = save_folder / 'neptune_artifact' + + test_neptune_logger.download_file( + remote_file_name=neptune_artifact_name, + destination=str(dst_path), + ) + + assert dst_path.exists() + + with open(str(dst_path), 'r') as fp: + assert fp.read() == file_content + + +def test_neptune_log_image(test_neptune_logger): + pytest.importorskip('neptune', reason='neptune is optional') + + with patch('neptune.attributes.FileSeries.extend', MagicMock()) as mock_extend: + image_variants = [ + (torch.rand(4, 4), False), # 2D image + (torch.rand(2, 3, 4, 4), False), # multiple images, not channels last + (torch.rand(2, 3, 4, 4, dtype=torch.float64), False), # same as above but with float64 + (torch.rand(3, 4, 4), False), # with channels, not channels last + ([torch.rand(4, 4, 3)], True), # with channels, channels last + (torch.rand(2, 4, 4, 3), True), # multiple images, channels last + ([torch.rand(4, 4, 3), torch.rand(4, 4, 3)], True) # multiple images in list + ] + + expected_num_images_total = 0 + for (images, channels_last) in image_variants: + if isinstance(images, Sequence): + expected_num_images = len(images) + np_images = [image.to(torch.float32).numpy() for image in images] + + else: + expected_num_images = 1 if images.ndim < 4 else images.shape[0] + np_images = images.to(torch.float32).numpy() + test_neptune_logger.log_images(images=images, channels_last=channels_last) + test_neptune_logger.log_images(images=np_images, channels_last=channels_last) + + expected_num_images *= 2 # One set of torch tensors, one set of numpy arrays + expected_num_images_total += expected_num_images + + test_neptune_logger.post_close() + assert mock_extend.call_count == 2 * len(image_variants) # One set of torch tensors, one set of numpy arrays From d2fef20734b7fe8a3d23d54ebb1403148db3525f Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Wed, 7 Feb 2024 15:17:49 -0500 Subject: [PATCH 04/16] fix (#2973) --- tests/algorithms/test_required_on_load.py | 2 +- tests/trainer/test_checkpoint.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/algorithms/test_required_on_load.py b/tests/algorithms/test_required_on_load.py index fd3fad06283..defaaa43899 100644 --- a/tests/algorithms/test_required_on_load.py +++ b/tests/algorithms/test_required_on_load.py @@ -164,7 +164,7 @@ def test_autoload(algo_name: str, load_weights_only: bool, already_added: bool, context = pytest.warns(UserWarning, match='Automatically adding required_on_load algorithm*') # Excluding some algorithms leads to errors when loading elif exclude: - if version.parse(torch.__version__) > version.parse('2.1.3'): + if version.parse(torch.__version__) > version.parse('2.2.9'): if algo_name in [ 'Alibi', 'BlurPool', 'Factorize', 'GatedLinearUnits', 'GhostBatchNorm', 'SqueezeExcite' ]: diff --git a/tests/trainer/test_checkpoint.py b/tests/trainer/test_checkpoint.py index 40a098ba1eb..76605742994 100644 --- a/tests/trainer/test_checkpoint.py +++ b/tests/trainer/test_checkpoint.py @@ -696,7 +696,7 @@ def test_strict_errors(self, missing_key: bool, unexpected_key: bool): last_checkpoint = os.path.join('first', 'ep2.pt') if missing_key or unexpected_key: message = r'Error\(s\) in loading state_dict' - if version.parse(torch.__version__) < version.parse('2.1.3'): + if version.parse(torch.__version__) < version.parse('2.2.9'): # Composer implements strict for older torch versions message = 'Failed to load checkpoint due to' error_context = pytest.raises(RuntimeError, match=message) From cb048e56eae2a8ce4eec049fa56b31d99b8b6340 Mon Sep 17 00:00:00 2001 From: Daniel King <43149077+dakinggg@users.noreply.github.com> Date: Wed, 7 Feb 2024 13:07:05 -0800 Subject: [PATCH 05/16] Add a register_model_with_run_id api to MLflowLogger (#2967) --- composer/loggers/mlflow_logger.py | 47 +++++++++++++++++++++++++ tests/loggers/test_mlflow_logger.py | 53 ++++++++++++++++++++++++++--- 2 files changed, 95 insertions(+), 5 deletions(-) diff --git a/composer/loggers/mlflow_logger.py b/composer/loggers/mlflow_logger.py index 880d1602d9c..ae40293e3af 100644 --- a/composer/loggers/mlflow_logger.py +++ b/composer/loggers/mlflow_logger.py @@ -346,6 +346,53 @@ def log_model(self, flavor: Literal['transformers'], **kwargs): else: raise NotImplementedError(f'flavor {flavor} not supported.') + def register_model_with_run_id( + self, + model_uri: str, + name: str, + await_creation_for: int = 300, + tags: Optional[Dict[str, Any]] = None, + ): + """Similar to ``register_model``, but uses a different MLflow API to allow passing in the run id. + + Args: + model_uri (str): The URI of the model to register. + name (str): The name of the model to register. Will be appended to ``model_registry_prefix``. + await_creation_for (int, optional): The number of seconds to wait for the model to be registered. Defaults to 300. + tags (Optional[Dict[str, Any]], optional): A dictionary of tags to add to the model. Defaults to None. + """ + if self._enabled: + from mlflow.exceptions import MlflowException + from mlflow.protos.databricks_pb2 import ALREADY_EXISTS, RESOURCE_ALREADY_EXISTS, ErrorCode + + full_name = f'{self.model_registry_prefix}.{name}' if len(self.model_registry_prefix) > 0 else name + + # This try/catch code is copied from + # https://github.com/mlflow/mlflow/blob/3ba1e50e90a38be19920cb9118593a43d7cfa90e/mlflow/tracking/_model_registry/fluent.py#L90-L103 + try: + create_model_response = self._mlflow_client.create_registered_model(full_name) + log.info(f'Successfully registered model {name} with {create_model_response.name}') + except MlflowException as e: + if e.error_code in ( + ErrorCode.Name(RESOURCE_ALREADY_EXISTS), + ErrorCode.Name(ALREADY_EXISTS), + ): + log.info(f'Registered model {name} already exists. Creating a new version of this model...') + else: + raise e + + create_version_response = self._mlflow_client.create_model_version( + name=full_name, + source=model_uri, + run_id=self._run_id, + await_creation_for=await_creation_for, + tags=tags, + ) + + log.info( + f'Successfully created model version {create_version_response.version} for model {create_version_response.name}' + ) + def log_images( self, images: Union[np.ndarray, torch.Tensor, Sequence[Union[np.ndarray, torch.Tensor]]], diff --git a/tests/loggers/test_mlflow_logger.py b/tests/loggers/test_mlflow_logger.py index ee68ab3584d..d5de5b8171e 100644 --- a/tests/loggers/test_mlflow_logger.py +++ b/tests/loggers/test_mlflow_logger.py @@ -432,11 +432,54 @@ def test_mlflow_register_model(tmp_path, monkeypatch): name='my_model', ) - assert mlflow.register_model.called_with(model_uri=local_mlflow_save_path, - name='my_catalog.my_schema.my_model', - await_registration_for=300, - tags=None, - registry_uri='databricks-uc') + mlflow.register_model.assert_called_with( + model_uri=local_mlflow_save_path, + name='my_catalog.my_schema.my_model', + await_registration_for=300, + tags=None, + ) + assert mlflow.get_registry_uri() == 'databricks-uc' + + test_mlflow_logger.post_close() + + +@pytest.mark.filterwarnings('ignore:.*Setuptools is replacing distutils.*:UserWarning') +@pytest.mark.filterwarnings("ignore:.*The 'transformers' MLflow Models integration.*:FutureWarning") +def test_mlflow_register_model_with_run_id(tmp_path, monkeypatch): + mlflow = pytest.importorskip('mlflow') + + mlflow_uri = tmp_path / Path('my-test-mlflow-uri') + mlflow_exp_name = 'test-log-model-exp-name' + test_mlflow_logger = MLFlowLogger( + tracking_uri=mlflow_uri, + experiment_name=mlflow_exp_name, + model_registry_prefix='my_catalog.my_schema', + model_registry_uri='databricks-uc', + ) + + monkeypatch.setattr(test_mlflow_logger._mlflow_client, 'create_model_version', MagicMock()) + monkeypatch.setattr(test_mlflow_logger._mlflow_client, 'create_registered_model', + MagicMock(return_value=type('MockResponse', (), {'name': 'my_catalog.my_schema.my_model'}))) + + mock_state = MagicMock() + mock_state.run_name = 'dummy-run-name' # this run name should be unused. + mock_logger = MagicMock() + + local_mlflow_save_path = str(tmp_path / Path('my_model_local')) + test_mlflow_logger.init(state=mock_state, logger=mock_logger) + + test_mlflow_logger.register_model_with_run_id( + model_uri=local_mlflow_save_path, + name='my_model', + ) + + test_mlflow_logger._mlflow_client.create_model_version.assert_called_with( + name='my_catalog.my_schema.my_model', + source=local_mlflow_save_path, + run_id=test_mlflow_logger._run_id, + await_creation_for=300, + tags=None, + ) assert mlflow.get_registry_uri() == 'databricks-uc' test_mlflow_logger.post_close() From cfe06976ede7dc9d6423522425a7ac15a1826375 Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Wed, 7 Feb 2024 16:15:14 -0500 Subject: [PATCH 06/16] remove specifics (#2971) --- .github/CODEOWNERS | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index a183caa01f7..b193288b3e6 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -20,8 +20,8 @@ /composer/algorithms/ @mosaicml/composer-team-eng /composer/cli/ @mosaicml/composer-team-eng /composer/datasets/ @mosaicml/composer-team-eng -/composer/functional/ @mosaicml/composer-team-eng @dblalock -/composer/loggers/ @mosaicml/composer-team-eng @eracah @dakinggg +/composer/functional/ @mosaicml/composer-team-eng +/composer/loggers/ @mosaicml/composer-team-eng /composer/loss/ @mosaicml/composer-team-eng /composer/metrics/ @mosaicml/composer-team-eng /composer/models/ @mosaicml/composer-team-eng From 07d53e0bacfa48101c03a312e46190bee206c1ea Mon Sep 17 00:00:00 2001 From: snarayan21 Date: Wed, 7 Feb 2024 14:27:00 -0800 Subject: [PATCH 07/16] before_load event added (#2974) Co-authored-by: Mihir Patel --- .../low_precision_groupnorm.py | 6 ++++-- .../low_precision_layernorm.py | 6 ++++-- composer/core/callback.py | 10 ++++++++++ composer/core/engine.py | 8 +++++--- composer/core/event.py | 13 ++++++++----- composer/trainer/trainer.py | 8 +++++--- docs/source/getting_started/welcome_tour.rst | 1 + docs/source/trainer/algorithms.rst | 1 + tests/test_events.py | 1 + 9 files changed, 39 insertions(+), 15 deletions(-) diff --git a/composer/algorithms/low_precision_groupnorm/low_precision_groupnorm.py b/composer/algorithms/low_precision_groupnorm/low_precision_groupnorm.py index 38f73a988e7..5cdad2c6c0d 100644 --- a/composer/algorithms/low_precision_groupnorm/low_precision_groupnorm.py +++ b/composer/algorithms/low_precision_groupnorm/low_precision_groupnorm.py @@ -52,8 +52,10 @@ class LowPrecisionGroupNorm(Algorithm): def __init__(self, apply_at: Event = Event.INIT): self.apply_at = apply_at - if self.apply_at not in {Event.INIT, Event.AFTER_LOAD}: - raise ValueError('LowPrecisionGroupNorm only supports application on Event.INIT and Event.AFTER_LOAD.') + if self.apply_at not in {Event.INIT, Event.BEFORE_LOAD, Event.AFTER_LOAD}: + raise ValueError( + 'LowPrecisionGroupNorm only supports application on Event.INIT, Event.BEFORE_LOAD, and Event.AFTER_LOAD.' + ) def __repr__(self) -> str: return f'{self.__class__.__name__}(apply_at={self.apply_at})' diff --git a/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py b/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py index 54a6df1162f..64ffaebb11a 100644 --- a/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py +++ b/composer/algorithms/low_precision_layernorm/low_precision_layernorm.py @@ -52,8 +52,10 @@ class LowPrecisionLayerNorm(Algorithm): def __init__(self, apply_at: Event = Event.INIT): self.apply_at = apply_at - if self.apply_at not in {Event.INIT, Event.AFTER_LOAD}: - raise ValueError('LowPrecisionLayerNorm only supports application on Event.INIT and Event.AFTER_LOAD.') + if self.apply_at not in {Event.INIT, Event.BEFORE_LOAD, Event.AFTER_LOAD}: + raise ValueError( + 'LowPrecisionLayerNorm only supports application on Event.INIT, Event.BEFORE_LOAD, and Event.AFTER_LOAD.' + ) def __repr__(self) -> str: return f'{self.__class__.__name__}(apply_at={self.apply_at})' diff --git a/composer/core/callback.py b/composer/core/callback.py index c6132ee9d4d..68c170bcab7 100644 --- a/composer/core/callback.py +++ b/composer/core/callback.py @@ -105,6 +105,16 @@ def init(self, state: State, logger: Logger) -> None: del state, logger # unused pass + def before_load(self, state: State, logger: Logger) -> None: + """Called on the :attr:`.Event.BEFORE_LOAD` event. + + Args: + state (State): The training state. + logger (Logger): The logger. + """ + del state, logger # unused + pass + def after_load(self, state: State, logger: Logger) -> None: """Called on the :attr:`.Event.AFTER_LOAD` event. diff --git a/composer/core/engine.py b/composer/core/engine.py index d3dff93cb79..75d89d0f9a6 100644 --- a/composer/core/engine.py +++ b/composer/core/engine.py @@ -351,11 +351,13 @@ def register_pass(self, algorithm_pass: passes.AlgorithmPass, index: int = -1): def _assert_dataloader_and_duration_set(state: State, event: Event): # correctness checks that dataloader and max duration need to be set for certain events - # dataloader should be set on all events expect INIT/AFTER_LOAD/EVAL_STANDALONE_START/EVAL_STANDALONE_END - if event not in {Event.INIT, Event.AFTER_LOAD, Event.EVAL_STANDALONE_START, Event.EVAL_STANDALONE_END}: + # dataloader should be set on all events except INIT/BEFORE_LOAD/AFTER_LOAD/EVAL_STANDALONE_START/EVAL_STANDALONE_END + if event not in { + Event.INIT, Event.BEFORE_LOAD, Event.AFTER_LOAD, Event.EVAL_STANDALONE_START, Event.EVAL_STANDALONE_END + }: assert state.dataloader is not None, f'The trainer should have set state.dataloader for event {event}.' - if event != Event.INIT and event != Event.AFTER_LOAD and not event.is_predict and not event.is_eval: + if event != Event.INIT and event != Event.BEFORE_LOAD and event != Event.AFTER_LOAD and not event.is_predict and not event.is_eval: assert state.max_duration is not None, f'The trainer should have set state.max_duration for event {event}.' def _run_algorithms( diff --git a/composer/core/event.py b/composer/core/event.py index 4cda7fc9a81..cb05d393fff 100644 --- a/composer/core/event.py +++ b/composer/core/event.py @@ -18,6 +18,7 @@ class Event(StringEnum): .. code-block:: python # + # # # for epoch in range(NUM_EPOCHS): @@ -93,6 +94,7 @@ class Event(StringEnum): Attributes: INIT: Invoked in the constructor of :class:`~.trainer.Trainer`. Model surgery (see :mod:`~composer.utils.module_surgery`) typically occurs here. + BEFORE_LOAD: Immediately before the checkpoint is loaded in :class:`~.trainer.Trainer`. AFTER_LOAD: Immediately after checkpoint is loaded in constructor of :class:`~.trainer.Trainer`. FIT_START: Invoked at the beginning of each call to :meth:`.Trainer.fit`. Dataset transformations typically occur here. @@ -142,6 +144,7 @@ class Event(StringEnum): """ INIT = 'init' + BEFORE_LOAD = 'before_load' AFTER_LOAD = 'after_load' FIT_START = 'fit_start' @@ -243,12 +246,12 @@ def is_eval(self) -> bool: return self.value.startswith('eval') -_BEFORE_EVENTS = (Event.FIT_START, Event.EPOCH_START, Event.BEFORE_DATALOADER, Event.BATCH_START, +_BEFORE_EVENTS = (Event.BEFORE_LOAD, Event.FIT_START, Event.EPOCH_START, Event.BEFORE_DATALOADER, Event.BATCH_START, Event.BEFORE_TRAIN_BATCH, Event.BEFORE_FORWARD, Event.BEFORE_LOSS, Event.BEFORE_BACKWARD, Event.EVAL_BEFORE_ALL, Event.EVAL_START, Event.EVAL_BATCH_START, Event.EVAL_BEFORE_FORWARD, Event.PREDICT_START, Event.PREDICT_BATCH_START, Event.PREDICT_BEFORE_FORWARD, Event.EVAL_STANDALONE_START) -_AFTER_EVENTS = (Event.EPOCH_END, Event.BATCH_END, Event.AFTER_DATALOADER, Event.AFTER_TRAIN_BATCH, Event.AFTER_FORWARD, - Event.AFTER_LOSS, Event.AFTER_BACKWARD, Event.EVAL_AFTER_ALL, Event.EVAL_END, Event.EVAL_BATCH_END, - Event.EVAL_AFTER_FORWARD, Event.FIT_END, Event.PREDICT_END, Event.PREDICT_BATCH_END, - Event.PREDICT_AFTER_FORWARD, Event.EVAL_STANDALONE_END) +_AFTER_EVENTS = (Event.AFTER_LOAD, Event.EPOCH_END, Event.BATCH_END, Event.AFTER_DATALOADER, Event.AFTER_TRAIN_BATCH, + Event.AFTER_FORWARD, Event.AFTER_LOSS, Event.AFTER_BACKWARD, Event.EVAL_AFTER_ALL, Event.EVAL_END, + Event.EVAL_BATCH_END, Event.EVAL_AFTER_FORWARD, Event.FIT_END, Event.PREDICT_END, + Event.PREDICT_BATCH_END, Event.PREDICT_AFTER_FORWARD, Event.EVAL_STANDALONE_END) diff --git a/composer/trainer/trainer.py b/composer/trainer/trainer.py index 5dd97bda64c..ebadb589440 100644 --- a/composer/trainer/trainer.py +++ b/composer/trainer/trainer.py @@ -1399,6 +1399,8 @@ def __init__( if 'optimizers' in self.state.serialized_attributes: self.state.serialized_attributes.remove('optimizers') + self.engine.run_event(Event.BEFORE_LOAD) + # Load Checkpoint self._rng_state = None # If autoresume is enabled, first check for existing checkpoints to load @@ -1513,9 +1515,9 @@ def __init__( self.engine.run_event(Event.AFTER_LOAD) # reseed here. This helps with a couple of issues: - # 1. rng state may change at Event.INIT/Event.AFTER_LOAD. For example, if an algorithm - # creates a new module and module parameters are initialized randomly, rng state will - # change. This reseeding nullifies such effects. + # 1. rng state may change at Event.INIT/Event.BEFORE_LOAD/Event.AFTER_LOAD. For example, + # if an algorithm creates a new module and module parameters are initialized randomly, rng + # state will change. This reseeding nullifies such effects. # 2. While resuming from a checkpoint, we want to spin dataloader and bring it back to the # same state as at the time of the checkpoint. Therefore, spinning needs to start from the # same rng state as in the original run. diff --git a/docs/source/getting_started/welcome_tour.rst b/docs/source/getting_started/welcome_tour.rst index a46dc85f339..649a9c87b0e 100644 --- a/docs/source/getting_started/welcome_tour.rst +++ b/docs/source/getting_started/welcome_tour.rst @@ -65,6 +65,7 @@ We could add events to our training loop as follows: .. code-block:: python # + # # # for epoch in range(NUM_EPOCHS): diff --git a/docs/source/trainer/algorithms.rst b/docs/source/trainer/algorithms.rst index 8021034ab8d..a494799dded 100644 --- a/docs/source/trainer/algorithms.rst +++ b/docs/source/trainer/algorithms.rst @@ -168,6 +168,7 @@ Composer’s `events` look as follows: state.model = model() state.train_dataloader = train_dataloader() state.optimizers = optimizers() + EVENT.BEFORE_LOAD load_checkpoint() EVENT.AFTER_LOAD EVENT.FIT_START diff --git a/tests/test_events.py b/tests/test_events.py index 8f2c11897a8..c81feea0b06 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -150,6 +150,7 @@ def _assert_expected_event_calls(self, trainer: Trainer, eval_interval: Time, nu expected_num_calls = { Event.INIT: 1, + Event.BEFORE_LOAD: 1, Event.AFTER_LOAD: 1, Event.EPOCH_START: num_epochs, Event.BATCH_START: total_steps, From 450994b4034e0bcbf105ec9ee6d69d213f5f42dd Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Wed, 7 Feb 2024 19:52:40 -0500 Subject: [PATCH 08/16] fix (#2975) --- .github/workflows/daily.yaml | 4 +--- tests/trainer/test_checkpoint.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/.github/workflows/daily.yaml b/.github/workflows/daily.yaml index 25f686cf897..0b428a39b20 100644 --- a/.github/workflows/daily.yaml +++ b/.github/workflows/daily.yaml @@ -83,7 +83,6 @@ jobs: aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} wandb-api-key: ${{ secrets.WANDB_API_KEY }} - slack-notifications-bot-token: ${{ secrets.SLACK_NOTIFICATIONS_BOT_TOKEN }} code-eval-device: ${{ secrets.CODE_EVAL_DEVICE }} code-eval-url: ${{ secrets.CODE_EVAL_URL }} code-eval-apikey: ${{ secrets.CODE_EVAL_APIKEY }} @@ -115,7 +114,7 @@ jobs: pytest_command: "coverage run -m pytest" composer_package_name: "mosaicml" - name: "gpu-3.10-2.2" - container: mosaicml/pytorch:2.2.0_cu121-python3.10-ubuntu20.04 + container: mosaicml/pytorch:2.2.0_cu121-python3.11-ubuntu20.04 markers: "(daily or not daily) and (remote or not remote) and gpu and (doctest or not doctest)" pytest_command: "coverage run -m pytest" composer_package_name: "mosaicml" @@ -131,4 +130,3 @@ jobs: python-version: 3.9 secrets: mcloud-api-key: ${{ secrets.MCLOUD_DAILY_API_KEY }} - slack-notifications-bot-token: ${{ secrets.SLACK_NOTIFICATIONS_BOT_TOKEN }} diff --git a/tests/trainer/test_checkpoint.py b/tests/trainer/test_checkpoint.py index 76605742994..7d74763cbb3 100644 --- a/tests/trainer/test_checkpoint.py +++ b/tests/trainer/test_checkpoint.py @@ -1020,7 +1020,7 @@ def test_autoload_algorithm_old_checkpoint(self): NoOpModel.__init__ = lambda self, x: None # type: ignore NoOpModel.__repr__ = lambda self: 'NoOpModel(3)' error_context = pytest.raises(KeyError, match='module.0.weight') - if version.parse(torch.__version__) < version.parse('2.1.3'): + if version.parse(torch.__version__) < version.parse('2.2.9'): error_context = pytest.raises(ValueError, match='loaded state dict contains a parameter group.*') with pytest.warns(UserWarning, match='required_on_load algorithm.*'), error_context: trainer_3 = self.get_trainer(load_path=os.path.join('first', 'ep1.pt'),) From bd4462e57b6b9937fb87f98a84a4c3870c670ab8 Mon Sep 17 00:00:00 2001 From: Shashank Rajput <144760128+ShashankMosaicML@users.noreply.github.com> Date: Thu, 8 Feb 2024 12:05:14 -0800 Subject: [PATCH 09/16] Adding the step argument to logger.log_table (#2961) * added the step argument to logger.log_table * .. * .. --- composer/loggers/cometml_logger.py | 7 ++++++- composer/loggers/console_logger.py | 7 ++++++- composer/loggers/file_logger.py | 7 ++++++- composer/loggers/in_memory_logger.py | 7 ++++++- composer/loggers/logger.py | 10 ++++++++-- composer/loggers/logger_destination.py | 12 ++++++++++-- composer/loggers/mlflow_logger.py | 7 ++++++- composer/loggers/wandb_logger.py | 8 ++++++-- 8 files changed, 54 insertions(+), 11 deletions(-) diff --git a/composer/loggers/cometml_logger.py b/composer/loggers/cometml_logger.py index 3581d862d63..01b814e6b7b 100644 --- a/composer/loggers/cometml_logger.py +++ b/composer/loggers/cometml_logger.py @@ -98,7 +98,12 @@ def init(self, state: State, logger: Logger) -> None: assert self.experiment is not None self.experiment.set_name(self.name) - def log_table(self, columns: List[str], rows: List[List[Any]], name: str = 'Table') -> None: + def log_table(self, + columns: List[str], + rows: List[List[Any]], + name: str = 'Table', + step: Optional[int] = None) -> None: + del step if self._enabled: assert self.experiment is not None try: diff --git a/composer/loggers/console_logger.py b/composer/loggers/console_logger.py index df97cdff041..4e6fc8a3d82 100644 --- a/composer/loggers/console_logger.py +++ b/composer/loggers/console_logger.py @@ -77,7 +77,12 @@ def log_hyperparameters(self, hyperparameters: Dict[str, Any]): # Lazy logging of hyperparameters. self.hparams.update(hyperparameters) - def log_table(self, columns: List[str], rows: List[List[Any]], name: str = 'Table') -> None: + def log_table(self, + columns: List[str], + rows: List[List[Any]], + name: str = 'Table', + step: Optional[int] = None) -> None: + del step try: import pandas as pd except ImportError as e: diff --git a/composer/loggers/file_logger.py b/composer/loggers/file_logger.py index 8d88a0deae8..c6d77d102cb 100644 --- a/composer/loggers/file_logger.py +++ b/composer/loggers/file_logger.py @@ -185,7 +185,12 @@ def log_traces(self, traces: Dict[str, Any]): trace_str + '\n', ) - def log_table(self, columns: List[str], rows: List[List[Any]], name: str = 'Table') -> None: + def log_table(self, + columns: List[str], + rows: List[List[Any]], + name: str = 'Table', + step: Optional[int] = None) -> None: + del step try: import pandas as pd except ImportError as e: diff --git a/composer/loggers/in_memory_logger.py b/composer/loggers/in_memory_logger.py index 0bf024f8c24..8f5a2c0ea3a 100644 --- a/composer/loggers/in_memory_logger.py +++ b/composer/loggers/in_memory_logger.py @@ -72,7 +72,12 @@ def __init__(self) -> None: def log_hyperparameters(self, hyperparameters: Dict[str, Any]): self.hyperparameters.update(hyperparameters) - def log_table(self, columns: List[str], rows: List[List[Any]], name: str = 'Table') -> None: + def log_table(self, + columns: List[str], + rows: List[List[Any]], + name: str = 'Table', + step: Optional[int] = None) -> None: + del step try: import pandas as pd except ImportError as e: diff --git a/composer/loggers/logger.py b/composer/loggers/logger.py index ee84596e3d9..f341ab375b1 100644 --- a/composer/loggers/logger.py +++ b/composer/loggers/logger.py @@ -60,9 +60,15 @@ def log_hyperparameters(self, parameters: Dict[str, Any]): for destination in self.destinations: destination.log_hyperparameters(parameters) - def log_table(self, columns: List[str], rows: List[List[Any]], name: str = 'Table') -> None: + def log_table(self, + columns: List[str], + rows: List[List[Any]], + name: str = 'Table', + step: Optional[int] = None) -> None: + if step is None: + step = self._state.timestamp.batch.value for destination in self.destinations: - destination.log_table(columns, rows, name) + destination.log_table(columns, rows, name, step) def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None) -> None: if step is None: diff --git a/composer/loggers/logger_destination.py b/composer/loggers/logger_destination.py index 0ce862cea6d..df41157dc6f 100644 --- a/composer/loggers/logger_destination.py +++ b/composer/loggers/logger_destination.py @@ -58,15 +58,23 @@ def log_hyperparameters(self, hyperparameters: Dict[str, Any]): del hyperparameters # unused pass - def log_table(self, columns: List[str], rows: List[List[Any]], name: str = 'Table') -> None: + def log_table(self, + columns: List[str], + rows: List[List[Any]], + name: str = 'Table', + step: Optional[int] = None) -> None: """Log a table. Args: columns (List[str]): Names of the columns in the table. rows (List[List[Any]]): 2D row-oriented array of values. name (str): Name of table. (Default: ``'Table'``) + step (Optional[int], optional): The current step or batch of training at the + time of logging. Defaults to None. If not specified the specific + LoggerDestination implementation will choose a step (usually a running + counter). """ - del columns, rows, name + del columns, rows, name, step pass def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None) -> None: diff --git a/composer/loggers/mlflow_logger.py b/composer/loggers/mlflow_logger.py index ae40293e3af..0e13791d64e 100644 --- a/composer/loggers/mlflow_logger.py +++ b/composer/loggers/mlflow_logger.py @@ -188,7 +188,12 @@ def init(self, state: State, logger: Logger) -> None: def after_load(self, state: State, logger: Logger) -> None: logger.log_hyperparameters({'mlflow_experiment_id': self._experiment_id, 'mlflow_run_id': self._run_id}) - def log_table(self, columns: List[str], rows: List[List[Any]], name: str = 'Table') -> None: + def log_table(self, + columns: List[str], + rows: List[List[Any]], + name: str = 'Table', + step: Optional[int] = None) -> None: + del step if self._enabled: try: import pandas as pd diff --git a/composer/loggers/wandb_logger.py b/composer/loggers/wandb_logger.py index d75f9d90168..7fc5fdcada6 100644 --- a/composer/loggers/wandb_logger.py +++ b/composer/loggers/wandb_logger.py @@ -117,11 +117,15 @@ def log_hyperparameters(self, hyperparameters: Dict[str, Any]): import wandb wandb.config.update(hyperparameters) - def log_table(self, columns: List[str], rows: List[List[Any]], name: str = 'Table') -> None: + def log_table(self, + columns: List[str], + rows: List[List[Any]], + name: str = 'Table', + step: Optional[int] = None) -> None: if self._enabled: import wandb table = wandb.Table(columns=columns, rows=rows) - wandb.log({name: table}) + wandb.log({name: table}, step) def log_metrics(self, metrics: Dict[str, Any], step: Optional[int] = None) -> None: if self._enabled: From 2cc99e78d7c1cd2f866dc1e2131eb0623d060b0e Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Thu, 8 Feb 2024 15:07:29 -0500 Subject: [PATCH 10/16] fix tests (#2980) --- .github/workflows/pytest-cpu.yaml | 11 ----------- .github/workflows/pytest-gpu.yaml | 11 ----------- pyproject.toml | 4 +++- 3 files changed, 3 insertions(+), 23 deletions(-) diff --git a/.github/workflows/pytest-cpu.yaml b/.github/workflows/pytest-cpu.yaml index 1125202c4a6..3f237424ba8 100644 --- a/.github/workflows/pytest-cpu.yaml +++ b/.github/workflows/pytest-cpu.yaml @@ -89,14 +89,3 @@ jobs: with: name: coverage-${{ github.sha }}-${{ inputs.name }} path: .coverage - - name: Notify slack fail - if: > - failure() && !cancelled() && (github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev') && - (github.event_name != 'pull_request' && github.event_name != 'pull_request_target') - env: - SLACK_BOT_TOKEN: ${{ secrets.slack-notifications-bot-token }} - uses: voxmedia/github-action-slack-notify-build@v1 - with: - channel: composer-issues - status: FAILED - color: danger diff --git a/.github/workflows/pytest-gpu.yaml b/.github/workflows/pytest-gpu.yaml index d3d899f0e30..550a3067465 100644 --- a/.github/workflows/pytest-gpu.yaml +++ b/.github/workflows/pytest-gpu.yaml @@ -87,14 +87,3 @@ jobs: python .github/mcli/mcli_pytest.py --image '${{ inputs.container }}' --pip_package_name \ '${{ inputs.composer_package_name }}' --pytest_markers '${{ inputs.pytest-markers }}' --pytest_command \ '${{ inputs.pytest-command }}' --timeout ${{ inputs.mcloud-timeout }} ${REF_ARGS} - - name: Notify slack fail - if: > - failure() && !cancelled() && (github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev') && - (github.event_name != 'pull_request' && github.event_name != 'pull_request_target') - env: - SLACK_BOT_TOKEN: ${{ secrets.slack-notifications-bot-token }} - uses: voxmedia/github-action-slack-notify-build@v1 - with: - channel: composer-issues - status: FAILED - color: danger diff --git a/pyproject.toml b/pyproject.toml index a4800ea34b5..15834406400 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -151,7 +151,9 @@ filterwarnings = [ # Ignore torch pytree deprecated warnings '''ignore:torch.utils._pytree._register_pytree_node is deprecated.*:UserWarning''', # Ignore autograd kernel warning inside DeepSpeed - '''ignore:.*an autograd kernel was not registered to the Autograd key.*:UserWarning''' + '''ignore:.*an autograd kernel was not registered to the Autograd key.*:UserWarning''', + # Ignore save_state_dict / load_state_dict deprecation warnings + '''ignore:'.*_state_dict' is deprecated and will be removed in future versions.*:UserWarning''' ] # Coverage From c497565438c071bb218cb3411183e4b13d323e9b Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Thu, 8 Feb 2024 15:07:48 -0500 Subject: [PATCH 11/16] lint (#2978) --- composer/utils/checkpoint.py | 17 +++++++++-------- composer/utils/file_helpers.py | 13 ++++++++++--- tests/trainer/test_checkpoint.py | 30 ++++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 11 deletions(-) diff --git a/composer/utils/checkpoint.py b/composer/utils/checkpoint.py index 2dff2c2eeee..88c9fa5a5c2 100644 --- a/composer/utils/checkpoint.py +++ b/composer/utils/checkpoint.py @@ -28,7 +28,7 @@ from composer.utils import dist, reproducibility from composer.utils.file_helpers import (FORMAT_NAME_WITH_DIST_AND_TIME_TABLE, format_name_with_dist, format_name_with_dist_and_time, get_file, is_tar) -from composer.utils.misc import is_model_deepspeed +from composer.utils.misc import is_model_deepspeed, partial_format from composer.utils.object_store import ObjectStore if TYPE_CHECKING: @@ -355,6 +355,7 @@ def load_checkpoint( Optional[list[dict[str, Any]]]: The RNG state dicts, indexed by global rank, if :attr:`load_weights_only` is not None. Otherwise, None. """ + path = partial_format(path, run_name=state.run_name) using_legacy_sharded = False if state.fsdp_elastic_sharded_enabled: assert object_store is None or isinstance( @@ -608,7 +609,7 @@ def download_checkpoint(path: str, checkpoint_is_sharded = fsdp_sharded_state_dict_enabled or deepspeed_sharded_checkpoint try: if not checkpoint_is_sharded and dist.get_local_rank() == 0: - # if the checkpoint is not sharded, then local rank 0 on each node needs to download the + # If the checkpoint is not sharded, then local rank 0 on each node needs to download the # global rank 0 checkpoint path = _format_path_with_rank_zero(path) get_file(destination=rank_zero_checkpoint_filepath, @@ -625,18 +626,18 @@ def download_checkpoint(path: str, # or could not be downloaded raise RuntimeError(f'Checkpoint {path} does not exist') elif checkpoint_is_sharded: - # if the checkpoint is sharded, then every rank needs to download its own checkpoint + # If the checkpoint is sharded, then every rank needs to download its own checkpoint + path = _format_path_with_current_rank(path) try: get_file(destination=rank_n_checkpoint_filepath, - path=_format_path_with_current_rank(path), + path=path, object_store=object_store, progress_bar=progress_bar) except FileNotFoundError as e: raise FileNotFoundError( - (f'Checkpoint {_format_path_with_current_rank(path)} does not exist, ' - f'but is required for sharded checkpointing on rank {dist.get_global_rank()}. ' - 'Please ensure that the checkpoint exists and your load_path was specified as a format string ' - 'with the {rank} argument.')) from e + (f'Checkpoint {path} does not exist, but is required for sharded checkpointing ' + f'on rank {dist.get_global_rank()}. Please ensure that the checkpoint exists ' + 'and your load_path was specified as a format string with the {rank} argument.')) from e if extracted_checkpoint_folder is not None: try: diff --git a/composer/utils/file_helpers.py b/composer/utils/file_helpers.py index c42aa7ce6f1..7c75b4633e4 100644 --- a/composer/utils/file_helpers.py +++ b/composer/utils/file_helpers.py @@ -32,9 +32,16 @@ log = logging.getLogger(__name__) __all__ = [ - 'get_file', 'ensure_folder_is_empty', 'ensure_folder_has_no_conflicting_files', 'format_name_with_dist', - 'format_name_with_dist_and_time', 'is_tar', 'create_symlink_file', 'maybe_create_object_store_from_uri', - 'maybe_create_remote_uploader_downloader_from_uri', 'parse_uri' + 'get_file', + 'ensure_folder_is_empty', + 'ensure_folder_has_no_conflicting_files', + 'format_name_with_dist', + 'format_name_with_dist_and_time', + 'is_tar', + 'create_symlink_file', + 'maybe_create_object_store_from_uri', + 'maybe_create_remote_uploader_downloader_from_uri', + 'parse_uri', ] diff --git a/tests/trainer/test_checkpoint.py b/tests/trainer/test_checkpoint.py index 7d74763cbb3..36aed8c9c63 100644 --- a/tests/trainer/test_checkpoint.py +++ b/tests/trainer/test_checkpoint.py @@ -1295,6 +1295,36 @@ def test_spin_dataloaders( save_folder / 'second' / 'latest-rank{rank}.pt', ) + def test_format_load_path(self, tmp_path: pathlib.Path): + run_name = 'a-quick-rabbit' + save_folder = os.path.join(tmp_path, '{run_name}') + trainer = self.get_trainer( + run_name=run_name, + save_folder=os.path.join(save_folder, 'first'), + save_filename='ep{epoch}-rank{rank}.pt', + save_interval='1ep', + ) + + trainer.fit() + trainer.close() + + resume_file = os.path.join(save_folder, 'first', 'ep1-rank0.pt') + trainer = self.get_trainer( + run_name=run_name, + save_folder=os.path.join(save_folder, 'second'), + save_filename='ep{epoch}-rank{rank}.pt', + save_interval='1ep', + load_path=resume_file, # <-- resume training from file + ) + trainer.fit() + trainer.close() + + save_folder = save_folder.replace('{run_name}', run_name) + _assert_checkpoints_equivalent( + os.path.join(save_folder, 'first', 'latest-rank{rank}.pt'), + os.path.join(save_folder, 'second', 'latest-rank{rank}.pt'), + ) + def _assert_expected_num_checkpoints( self, save_folder: str, From cee24953a8e77183c0eb8cb974ddb84b9c010d7c Mon Sep 17 00:00:00 2001 From: Mihir Patel Date: Thu, 8 Feb 2024 15:09:06 -0500 Subject: [PATCH 12/16] bump version (#2979) --- composer/_version.py | 2 +- docker/README.md | 4 ++-- docker/build_matrix.yaml | 12 ++++++------ docker/generate_build_matrix.py | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/composer/_version.py b/composer/_version.py index 054fc70d0a3..6a46c95e084 100644 --- a/composer/_version.py +++ b/composer/_version.py @@ -3,4 +3,4 @@ """The Composer Version.""" -__version__ = '0.19.0' +__version__ = '0.19.1' diff --git a/docker/README.md b/docker/README.md index a450cb24e1d..62a265adf8a 100644 --- a/docker/README.md +++ b/docker/README.md @@ -15,8 +15,8 @@ all dependencies for both NLP and Vision models. They are built on top of the | Composer Version | CUDA Support | Docker Tag | |--------------------|----------------|----------------------------------------------------------------| -| 0.19.0 | Yes | `mosaicml/composer:latest`, `mosaicml/composer:0.19.0` | -| 0.19.0 | No | `mosaicml/composer:latest_cpu`, `mosaicml/composer:0.19.0_cpu` | +| 0.19.1 | Yes | `mosaicml/composer:latest`, `mosaicml/composer:0.19.1` | +| 0.19.1 | No | `mosaicml/composer:latest_cpu`, `mosaicml/composer:0.19.1_cpu` | **Note**: For a lightweight installation, we recommended using a [MosaicML PyTorch Image](#pytorch-images) and manually diff --git a/docker/build_matrix.yaml b/docker/build_matrix.yaml index 96250fa9644..2dd849432c9 100644 --- a/docker/build_matrix.yaml +++ b/docker/build_matrix.yaml @@ -246,9 +246,9 @@ TORCHVISION_VERSION: 0.18.0 - AWS_OFI_NCCL_VERSION: '' BASE_IMAGE: nvidia/cuda:12.1.0-cudnn8-devel-ubuntu20.04 - COMPOSER_INSTALL_COMMAND: mosaicml[all]==0.19.0 + COMPOSER_INSTALL_COMMAND: mosaicml[all]==0.19.1 CUDA_VERSION: 12.1.0 - IMAGE_NAME: composer-0-19-0 + IMAGE_NAME: composer-0-19-1 MOFED_VERSION: 5.5-1.0.3.2 NVIDIA_REQUIRE_CUDA_OVERRIDE: cuda>=12.1 brand=tesla,driver>=450,driver<451 brand=tesla,driver>=470,driver<471 brand=unknown,driver>=470,driver<471 brand=nvidia,driver>=470,driver<471 brand=nvidiartx,driver>=470,driver<471 @@ -269,15 +269,15 @@ PYTORCH_NIGHTLY_VERSION: '' PYTORCH_VERSION: 2.1.2 TAGS: - - mosaicml/composer:0.19.0 + - mosaicml/composer:0.19.1 - mosaicml/composer:latest TARGET: composer_stage TORCHVISION_VERSION: 0.16.2 - AWS_OFI_NCCL_VERSION: '' BASE_IMAGE: ubuntu:20.04 - COMPOSER_INSTALL_COMMAND: mosaicml[all]==0.19.0 + COMPOSER_INSTALL_COMMAND: mosaicml[all]==0.19.1 CUDA_VERSION: '' - IMAGE_NAME: composer-0-19-0-cpu + IMAGE_NAME: composer-0-19-1-cpu MOFED_VERSION: 5.5-1.0.3.2 NVIDIA_REQUIRE_CUDA_OVERRIDE: '' PYTHON_VERSION: '3.10' @@ -285,7 +285,7 @@ PYTORCH_NIGHTLY_VERSION: '' PYTORCH_VERSION: 2.1.2 TAGS: - - mosaicml/composer:0.19.0_cpu + - mosaicml/composer:0.19.1_cpu - mosaicml/composer:latest_cpu TARGET: composer_stage TORCHVISION_VERSION: 0.16.2 diff --git a/docker/generate_build_matrix.py b/docker/generate_build_matrix.py index 23674075725..3c85993b20d 100644 --- a/docker/generate_build_matrix.py +++ b/docker/generate_build_matrix.py @@ -261,7 +261,7 @@ def _main(): composer_entries = [] # The `GIT_COMMIT` is a placeholder and Jenkins will substitute it with the actual git commit for the `composer_staging` images - composer_versions = ['0.19.0'] # Only build images for the latest composer version + composer_versions = ['0.19.1'] # Only build images for the latest composer version composer_python_versions = [PRODUCTION_PYTHON_VERSION] # just build composer against the latest for product in itertools.product(composer_python_versions, composer_versions, cuda_options): From 327d68c39279cfa2b912f4fa5ce6902b1f235ace Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Thu, 8 Feb 2024 17:34:17 -0800 Subject: [PATCH 13/16] Fix UC object store bugfix (#2982) * push changes * pls work * update test * formatting * better err handling * try checking NotFound * isort * Remove extra filenotfound except --------- Co-authored-by: Jerry Chen --- composer/utils/object_store/uc_object_store.py | 18 +++++++++++------- .../utils/object_store/test_uc_object_store.py | 9 ++++----- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/composer/utils/object_store/uc_object_store.py b/composer/utils/object_store/uc_object_store.py index 2578e7151ba..9b08530888c 100644 --- a/composer/utils/object_store/uc_object_store.py +++ b/composer/utils/object_store/uc_object_store.py @@ -24,8 +24,9 @@ def _wrap_errors(uri: str, e: Exception): from databricks.sdk.core import DatabricksError + from databricks.sdk.errors.mapping import NotFound if isinstance(e, DatabricksError): - if e.error_code == _NOT_FOUND_ERROR_CODE: # type: ignore + if isinstance(e, NotFound) or e.error_code == _NOT_FOUND_ERROR_CODE: # type: ignore raise FileNotFoundError(f'Object {uri} not found') from e raise ObjectStoreTransientError from e @@ -48,6 +49,7 @@ class UCObjectStore(ObjectStore): """ _UC_VOLUME_LIST_API_ENDPOINT = '/api/2.0/fs/list' + _UC_VOLUME_FILES_API_ENDPOINT = '/api/2.0/fs/files' def __init__(self, path: str) -> None: try: @@ -206,13 +208,15 @@ def get_object_size(self, object_name: str) -> int: """ from databricks.sdk.core import DatabricksError try: - file_info = self.client.files.get_status(self._get_object_path(object_name)) - if file_info.is_dir: - raise IsADirectoryError(f'{object_name} is a UC directory, not a file.') - - assert file_info.file_size is not None - return file_info.file_size + # Note: The UC team is working on changes to fix the files.get_status API, but it currently + # does not work. Once fixed, we will call the files API endpoint. We currently only use this + # function in Composer and LLM-foundry to check the UC object's existence. + self.client.api_client.do(method='HEAD', + path=f'{self._UC_VOLUME_FILES_API_ENDPOINT}/{self.prefix}/{object_name}', + headers={'Source': 'mosaicml/composer'}) + return 1000000 # Dummy value, as we don't have a way to get the size of the file except DatabricksError as e: + # If the code reaches here, the file was not found _wrap_errors(self.get_uri(object_name), e) return -1 diff --git a/tests/utils/object_store/test_uc_object_store.py b/tests/utils/object_store/test_uc_object_store.py index 0ca3dbbbd35..6d047d42ee8 100644 --- a/tests/utils/object_store/test_uc_object_store.py +++ b/tests/utils/object_store/test_uc_object_store.py @@ -78,13 +78,12 @@ def test_uc_object_store_invalid_prefix(monkeypatch): @pytest.mark.parametrize('result', ['success', 'not_found']) def test_get_object_size(ws_client, uc_object_store, result: str): if result == 'success': - db_files = pytest.importorskip('databricks.sdk.service.files') - ws_client.files.get_status.return_value = db_files.FileInfo(file_size=100) - assert uc_object_store.get_object_size('train.txt') == 100 + ws_client.api_client.do.return_value = {} + assert uc_object_store.get_object_size('train.txt') == 1000000 elif result == 'not_found': db_core = pytest.importorskip('databricks.sdk.core', reason='requires databricks') - ws_client.files.get_status.side_effect = db_core.DatabricksError('The file being accessed is not found', - error_code='NOT_FOUND') + ws_client.api_client.do.side_effect = db_core.DatabricksError('The file being accessed is not found', + error_code='NOT_FOUND') with pytest.raises(FileNotFoundError): uc_object_store.get_object_size('train.txt') else: From 2fd6dd6dd8b5b85c0d02dfa713a691351e92ba20 Mon Sep 17 00:00:00 2001 From: Nancy Hung Date: Thu, 8 Feb 2024 22:26:03 -0800 Subject: [PATCH 14/16] fml (#2988) --- composer/utils/object_store/uc_object_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer/utils/object_store/uc_object_store.py b/composer/utils/object_store/uc_object_store.py index 9b08530888c..3ffe197faca 100644 --- a/composer/utils/object_store/uc_object_store.py +++ b/composer/utils/object_store/uc_object_store.py @@ -212,7 +212,7 @@ def get_object_size(self, object_name: str) -> int: # does not work. Once fixed, we will call the files API endpoint. We currently only use this # function in Composer and LLM-foundry to check the UC object's existence. self.client.api_client.do(method='HEAD', - path=f'{self._UC_VOLUME_FILES_API_ENDPOINT}/{self.prefix}/{object_name}', + path=f'{self._UC_VOLUME_FILES_API_ENDPOINT}/{self._get_object_path(object_name)}', headers={'Source': 'mosaicml/composer'}) return 1000000 # Dummy value, as we don't have a way to get the size of the file except DatabricksError as e: From 1a22691a568883ab28217d6db3bc19226994c11a Mon Sep 17 00:00:00 2001 From: Daniel King <43149077+dakinggg@users.noreply.github.com> Date: Thu, 8 Feb 2024 23:25:21 -0800 Subject: [PATCH 15/16] Minor cleanup of UC get_object_size (#2989) --- composer/utils/object_store/uc_object_store.py | 6 +++--- tests/utils/object_store/test_uc_object_store.py | 8 ++++++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/composer/utils/object_store/uc_object_store.py b/composer/utils/object_store/uc_object_store.py index 3ffe197faca..4fc901212ad 100644 --- a/composer/utils/object_store/uc_object_store.py +++ b/composer/utils/object_store/uc_object_store.py @@ -211,9 +211,9 @@ def get_object_size(self, object_name: str) -> int: # Note: The UC team is working on changes to fix the files.get_status API, but it currently # does not work. Once fixed, we will call the files API endpoint. We currently only use this # function in Composer and LLM-foundry to check the UC object's existence. - self.client.api_client.do(method='HEAD', - path=f'{self._UC_VOLUME_FILES_API_ENDPOINT}/{self._get_object_path(object_name)}', - headers={'Source': 'mosaicml/composer'}) + object_path = self._get_object_path(object_name).lstrip('/') + path = os.path.join(self._UC_VOLUME_FILES_API_ENDPOINT, object_path) + self.client.api_client.do(method='HEAD', path=path, headers={'Source': 'mosaicml/composer'}) return 1000000 # Dummy value, as we don't have a way to get the size of the file except DatabricksError as e: # If the code reaches here, the file was not found diff --git a/tests/utils/object_store/test_uc_object_store.py b/tests/utils/object_store/test_uc_object_store.py index 6d047d42ee8..60845e43eb0 100644 --- a/tests/utils/object_store/test_uc_object_store.py +++ b/tests/utils/object_store/test_uc_object_store.py @@ -90,6 +90,14 @@ def test_get_object_size(ws_client, uc_object_store, result: str): raise NotImplementedError(f'Test for result={result} is not implemented.') +def test_get_object_size_full_path(ws_client, uc_object_store): + ws_client.api_client.do.return_value = {} + assert uc_object_store.get_object_size('Volumes/catalog/schema/volume/train.txt') == 1000000 + ws_client.api_client.do.assert_called_with(method='HEAD', + path=f'/api/2.0/fs/files/Volumes/catalog/schema/volume/train.txt', + headers={'Source': 'mosaicml/composer'}) + + def test_get_uri(uc_object_store): assert uc_object_store.get_uri('train.txt') == 'dbfs:/Volumes/catalog/schema/volume/train.txt' assert uc_object_store.get_uri('Volumes/catalog/schema/volume/checkpoint/model.bin' From 4238884e9e968f4ed0fc81364058c77dd664a87e Mon Sep 17 00:00:00 2001 From: Daniel King <43149077+dakinggg@users.noreply.github.com> Date: Fri, 9 Feb 2024 11:34:27 -0800 Subject: [PATCH 16/16] Pin databricks-sdk to 0.18.0 (#2990) --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index c17b3014910..a027340b1ee 100644 --- a/setup.py +++ b/setup.py @@ -224,7 +224,7 @@ def package_files(prefix: str, directory: str, extension: str): extra_deps['pandas'] = ['pandas>=2.0.0,<3.0'] -extra_deps['databricks'] = ['databricks-sdk>=0.15.0,<1.0'] +extra_deps['databricks'] = ['databricks-sdk==0.18.0'] extra_deps['all'] = {dep for deps in extra_deps.values() for dep in deps}