From f5986ea7604b494695d5ba632150374c3fc503d1 Mon Sep 17 00:00:00 2001 From: Harsh Panchal <68880048+panchalhp-db@users.noreply.github.com> Date: Wed, 27 Sep 2023 18:28:41 -0700 Subject: [PATCH] Add Databricks UC Volume Object Store (#2548) - Add support for Databricks Unity Catalog Volumes as an object store. --- .../loggers/remote_uploader_downloader.py | 3 +- composer/utils/__init__.py | 3 +- composer/utils/file_helpers.py | 19 +- composer/utils/object_store/__init__.py | 3 +- .../utils/object_store/uc_object_store.py | 214 ++++++++++++++++++ setup.py | 2 + .../object_store/object_store_settings.py | 6 +- .../object_store/test_uc_object_store.py | 161 +++++++++++++ tests/utils/test_file_helpers.py | 21 ++ 9 files changed, 420 insertions(+), 12 deletions(-) create mode 100644 composer/utils/object_store/uc_object_store.py create mode 100644 tests/utils/object_store/test_uc_object_store.py diff --git a/composer/loggers/remote_uploader_downloader.py b/composer/loggers/remote_uploader_downloader.py index 19263b8ad4..0319e295c3 100644 --- a/composer/loggers/remote_uploader_downloader.py +++ b/composer/loggers/remote_uploader_downloader.py @@ -25,7 +25,7 @@ from composer.loggers.logger import Logger from composer.loggers.logger_destination import LoggerDestination from composer.utils import (GCSObjectStore, LibcloudObjectStore, ObjectStore, ObjectStoreTransientError, OCIObjectStore, - S3ObjectStore, SFTPObjectStore, dist, format_name_with_dist, get_file, retry) + S3ObjectStore, SFTPObjectStore, UCObjectStore, dist, format_name_with_dist, get_file, retry) if TYPE_CHECKING: from composer.core import State @@ -43,6 +43,7 @@ def _build_remote_backend(remote_backend_name: str, backend_kwargs: Dict[str, An 'sftp': SFTPObjectStore, 'libcloud': LibcloudObjectStore, 'gs': GCSObjectStore, + 'dbfs': UCObjectStore, } remote_backend_cls = remote_backend_name_to_cls.get(remote_backend_name, None) if remote_backend_cls is None: diff --git a/composer/utils/__init__.py b/composer/utils/__init__.py index 91711f464d..e081609767 100644 --- a/composer/utils/__init__.py +++ b/composer/utils/__init__.py @@ -22,7 +22,7 @@ from composer.utils.misc import (get_free_tcp_port, is_model_deepspeed, is_model_fsdp, is_notebook, model_eval_mode, using_torch_2) from composer.utils.object_store import (GCSObjectStore, LibcloudObjectStore, ObjectStore, ObjectStoreTransientError, - OCIObjectStore, S3ObjectStore, SFTPObjectStore) + OCIObjectStore, S3ObjectStore, SFTPObjectStore, UCObjectStore) from composer.utils.retrying import retry from composer.utils.string_enum import StringEnum @@ -43,6 +43,7 @@ 'SFTPObjectStore', 'OCIObjectStore', 'GCSObjectStore', + 'UCObjectStore', 'MissingConditionalImportError', 'import_object', 'is_model_deepspeed', diff --git a/composer/utils/file_helpers.py b/composer/utils/file_helpers.py index 9ed8283383..9b9664147a 100644 --- a/composer/utils/file_helpers.py +++ b/composer/utils/file_helpers.py @@ -20,7 +20,7 @@ from composer.utils import dist from composer.utils.iter_helpers import iterate_with_callback -from composer.utils.object_store import GCSObjectStore, ObjectStore, OCIObjectStore, S3ObjectStore +from composer.utils.object_store import GCSObjectStore, ObjectStore, OCIObjectStore, S3ObjectStore, UCObjectStore if TYPE_CHECKING: from composer.core import Timestamp @@ -337,7 +337,7 @@ def maybe_create_object_store_from_uri(uri: str) -> Optional[ObjectStore]: Returns: Optional[ObjectStore]: Returns an :class:`composer.utils.ObjectStore` if the URI is of a supported format, otherwise None """ - backend, bucket_name, _ = parse_uri(uri) + backend, bucket_name, path = parse_uri(uri) if backend == '': return None if backend == 's3': @@ -349,9 +349,13 @@ def maybe_create_object_store_from_uri(uri: str) -> Optional[ObjectStore]: return GCSObjectStore(bucket=bucket_name) elif backend == 'oci': return OCIObjectStore(bucket=bucket_name) + elif backend == 'dbfs': + # validate if the path conforms to the requirements for UC volume paths + UCObjectStore.validate_path(path) + return UCObjectStore(path=path) else: raise NotImplementedError(f'There is no implementation for the cloud backend {backend} via URI. Please use ' - 's3 or one of the supported object stores') + 'one of the supported object stores') def maybe_create_remote_uploader_downloader_from_uri( @@ -372,7 +376,7 @@ def maybe_create_remote_uploader_downloader_from_uri( """ from composer.loggers import RemoteUploaderDownloader existing_remote_uds = [logger_dest for logger_dest in loggers if isinstance(logger_dest, RemoteUploaderDownloader)] - backend, bucket_name, _ = parse_uri(uri) + backend, bucket_name, path = parse_uri(uri) if backend == '': return None for existing_remote_ud in existing_remote_uds: @@ -387,10 +391,13 @@ def maybe_create_remote_uploader_downloader_from_uri( elif backend == 'wandb': raise NotImplementedError(f'There is no implementation for WandB via URI. Please use ' 'WandBLogger with log_artifacts set to True') - + elif backend == 'dbfs': + # validate if the path conforms to the requirements for UC volume paths + UCObjectStore.validate_path(path) + return RemoteUploaderDownloader(bucket_uri=uri, backend_kwargs={'path': path}) else: raise NotImplementedError(f'There is no implementation for the cloud backend {backend} via URI. Please use ' - 's3 or one of the supported RemoteUploaderDownloader object stores') + 'one of the supported RemoteUploaderDownloader object stores') def get_file(path: str, diff --git a/composer/utils/object_store/__init__.py b/composer/utils/object_store/__init__.py index 49fc6096e7..de28ec9674 100644 --- a/composer/utils/object_store/__init__.py +++ b/composer/utils/object_store/__init__.py @@ -9,8 +9,9 @@ from composer.utils.object_store.oci_object_store import OCIObjectStore from composer.utils.object_store.s3_object_store import S3ObjectStore from composer.utils.object_store.sftp_object_store import SFTPObjectStore +from composer.utils.object_store.uc_object_store import UCObjectStore __all__ = [ 'ObjectStore', 'ObjectStoreTransientError', 'LibcloudObjectStore', 'S3ObjectStore', 'SFTPObjectStore', - 'OCIObjectStore', 'GCSObjectStore' + 'OCIObjectStore', 'GCSObjectStore', 'UCObjectStore' ] diff --git a/composer/utils/object_store/uc_object_store.py b/composer/utils/object_store/uc_object_store.py new file mode 100644 index 0000000000..af6f60321e --- /dev/null +++ b/composer/utils/object_store/uc_object_store.py @@ -0,0 +1,214 @@ +# Copyright 2022 MosaicML Composer authors +# SPDX-License-Identifier: Apache-2.0 + +"""Databricks Unity Catalog Volumes object store.""" + +from __future__ import annotations + +import logging +import os +import pathlib +import uuid +from typing import Callable, List, Optional + +from composer.utils.import_helpers import MissingConditionalImportError +from composer.utils.object_store.object_store import ObjectStore, ObjectStoreTransientError + +log = logging.getLogger(__name__) + +__all__ = ['UCObjectStore'] + +_NOT_FOUND_ERROR_CODE = 'NOT_FOUND' + + +def _wrap_errors(uri: str, e: Exception): + from databricks.sdk.core import DatabricksError + if isinstance(e, DatabricksError): + if e.error_code == _NOT_FOUND_ERROR_CODE: # type: ignore + raise FileNotFoundError(f'Object {uri} not found') from e + raise ObjectStoreTransientError from e + + +class UCObjectStore(ObjectStore): + """Utility class for uploading and downloading data from Databricks Unity Catalog (UC) Volumes. + + .. note:: + + Using this object store requires setting `DATABRICKS_HOST` and `DATABRICKS_TOKEN` + environment variables with the right credentials to be able to access the files in + the unity catalog volumes. + + Args: + path (str): The Databricks UC Volume path that is of the format + `Volumes////path/to/folder`. + Note that this prefix should always start with /Volumes and adhere to the above format + since this object store only suports Unity Catalog Volumes and + not other Databricks Filesystems. + """ + + def __init__(self, path: str) -> None: + try: + from databricks.sdk import WorkspaceClient + except ImportError as e: + raise MissingConditionalImportError('databricks', conda_package='databricks-sdk>=0.8.0,<1.0') from e + + if not 'DATABRICKS_HOST' in os.environ or not 'DATABRICKS_TOKEN' in os.environ: + raise ValueError('Environment variables `DATABRICKS_HOST` and `DATABRICKS_TOKEN` ' + 'must be set to use Databricks Unity Catalog Volumes') + self.prefix = self.validate_path(path) + self.client = WorkspaceClient() + + @staticmethod + def validate_path(path: str) -> str: + """Parses the given path to extract the UC Volume prefix from the path. + + .. note:: + + This function only uses the first 4 directories from the path to construct the + UC Volumes prefix and will ignore the rest of the directories in the path + + Args: + path (str): The Databricks UC Volume path of the format + `Volumes////path/to/folder`. + """ + path = os.path.normpath(path) + if not path.startswith('Volumes'): + raise ValueError('Databricks Unity Catalog Volumes paths should start with "/Volumes".') + + dirs = path.split(os.sep) + if len(dirs) < 4: + raise ValueError(f'Databricks Unity Catalog Volumes path expected to be of the format ' + '`Volumes////`. ' + f'Found path={path}') + + # The first 4 dirs form the prefix + return os.path.join(*dirs[:4]) + + def _get_object_path(self, object_name: str) -> str: + """Return the absolute Single Path Namespace for the given object_name. + + Args: + object_name (str): Absolute or relative path of the object w.r.t. the + UC Volumes root. + """ + # convert object name to relative path if prefix is included + if os.path.commonprefix([object_name, self.prefix]) == self.prefix: + object_name = os.path.relpath(object_name, start=self.prefix) + return os.path.join('/', self.prefix, object_name) + + def get_uri(self, object_name: str) -> str: + """Returns the URI for ``object_name``. + + .. note:: + + This function does not check that ``object_name`` is in the object store. + It computes the URI statically. + + Args: + object_name (str): The object name. + + Returns: + str: The URI for ``object_name`` in the object store. + """ + return f'dbfs:{self._get_object_path(object_name)}' + + def upload_object(self, + object_name: str, + filename: str | pathlib.Path, + callback: Callable[[int, int], None] | None = None) -> None: + """Upload a file from local to UC volumes. + + Args: + object_name (str): Name of the stored object in UC volumes w.r.t. volume root. + filename (str | pathlib.Path): Path the the object on disk + callback ((int, int) -> None, optional): Unused + """ + # remove unused variable + del callback + with open(filename, 'rb') as f: + self.client.files.upload(self._get_object_path(object_name), f) + + def download_object(self, + object_name: str, + filename: str | pathlib.Path, + overwrite: bool = False, + callback: Callable[[int, int], None] | None = None) -> None: + """Download the given object from UC Volumes to the specified filename. + + Args: + object_name (str): The name of the object to download i.e. path relative to the root of the volume. + filename (str | pathlib.Path): The local path where a the file needs to be downloaded. + overwrite(bool, optional): Whether to overwrite an existing file at ``filename``, if it exists. + (default: ``False``) + callback ((int) -> None, optional): Unused + + Raises: + FileNotFoundError: If the file was not found in UC volumes. + ObjectStoreTransientError: If there was any other error querying the Databricks UC volumes that should be retried. + """ + # remove unused variable + del callback + + if os.path.exists(filename) and not overwrite: + raise FileExistsError(f'The file at {filename} already exists and overwrite is set to False.') + + dirname = os.path.dirname(filename) + if dirname: + os.makedirs(dirname, exist_ok=True) + tmp_path = str(filename) + f'{uuid.uuid4()}.tmp' + + try: + from databricks.sdk.core import DatabricksError + try: + with self.client.files.download(self._get_object_path(object_name)).contents as resp: + with open(tmp_path, 'wb') as f: + # Chunk the data into multiple blocks of 64MB to avoid + # OOMs when downloading really large files + for chunk in iter(lambda: resp.read(64 * 1024 * 1024), b''): + f.write(chunk) + except DatabricksError as e: + _wrap_errors(self.get_uri(object_name), e) + except: + # Make best effort attempt to clean up the temporary file + try: + os.remove(tmp_path) + except OSError: + pass + raise + else: + if overwrite: + os.replace(tmp_path, filename) + else: + os.rename(tmp_path, filename) + + def get_object_size(self, object_name: str) -> int: + """Get the size of the object in UC volumes in bytes. + + Args: + object_name (str): The name of the object. + + Returns: + int: The object size, in bytes. + + Raises: + FileNotFoundError: If the file was not found in the object store. + """ + from databricks.sdk.core import DatabricksError + try: + file_info = self.client.files.get_status(self._get_object_path(object_name)) + return file_info.file_size + except DatabricksError as e: + _wrap_errors(self.get_uri(object_name), e) + + def list_objects(self, prefix: Optional[str]) -> List[str]: + """List all objects in the object store with the given prefix. + + Args: + prefix (str): The prefix to search for. + + Returns: + list[str]: A list of object names that match the prefix. + """ + # TODO: Implement this function once UC volumes list endpoint is available in the SDK + del prefix # unused + raise NotImplementedError(f'{type(self).__name__}.list_objects is not implemented') diff --git a/setup.py b/setup.py index 9331f541f8..84d848b8d0 100644 --- a/setup.py +++ b/setup.py @@ -228,6 +228,8 @@ def package_files(prefix: str, directory: str, extension: str): extra_deps['pandas'] = ['pandas>=2.0.0,<3.0'] +extra_deps['databricks'] = ['databricks-sdk>=0.8.0,<1.0'] + extra_deps['all'] = {dep for deps in extra_deps.values() for dep in deps} composer_data_files = ['py.typed'] diff --git a/tests/utils/object_store/object_store_settings.py b/tests/utils/object_store/object_store_settings.py index 4c8fbd2eab..d94cd70fd6 100644 --- a/tests/utils/object_store/object_store_settings.py +++ b/tests/utils/object_store/object_store_settings.py @@ -15,7 +15,7 @@ import composer.utils.object_store import composer.utils.object_store.sftp_object_store from composer.utils.object_store import (GCSObjectStore, LibcloudObjectStore, ObjectStore, OCIObjectStore, - S3ObjectStore, SFTPObjectStore) + S3ObjectStore, SFTPObjectStore, UCObjectStore) from composer.utils.object_store.sftp_object_store import SFTPObjectStore from tests.common import get_module_subclasses @@ -56,8 +56,8 @@ object_stores = [ pytest.param(x, marks=_object_store_marks[x], id=x.__name__) for x in get_module_subclasses(composer.utils.object_store, ObjectStore) - # Note: OCI and Gs have their own test suite, so they are exempt from being included in this one.`` - if not issubclass(x, OCIObjectStore) and not issubclass(x, GCSObjectStore) + # Note: OCI, GCS and UC have their own test suite, so they are exempt from being included in this one.`` + if not issubclass(x, OCIObjectStore) and not issubclass(x, GCSObjectStore) and not issubclass(x, UCObjectStore) ] diff --git a/tests/utils/object_store/test_uc_object_store.py b/tests/utils/object_store/test_uc_object_store.py new file mode 100644 index 0000000000..792d7b3914 --- /dev/null +++ b/tests/utils/object_store/test_uc_object_store.py @@ -0,0 +1,161 @@ +# Copyright 2022 MosaicML Composer authors +# SPDX-License-Identifier: Apache-2.0 + +from pathlib import Path +from unittest import mock +from unittest.mock import ANY, MagicMock + +import pytest +from torch.utils.data import DataLoader + +from composer.loggers import RemoteUploaderDownloader +from composer.trainer import Trainer +from composer.utils import UCObjectStore +from composer.utils.object_store.object_store import ObjectStoreTransientError +from tests.common import RandomClassificationDataset, SimpleModel + + +@pytest.fixture +def ws_client(monkeypatch): + mock_files = MagicMock() + mock_ws_client = MagicMock() + monkeypatch.setattr(mock_ws_client, 'files', mock_files) + return mock_ws_client + + +@pytest.fixture +def uc_object_store(ws_client, monkeypatch): + db = pytest.importorskip('databricks.sdk', reason='requires databricks') + + monkeypatch.setenv('DATABRICKS_HOST', 'test-host') + monkeypatch.setenv('DATABRICKS_TOKEN', 'test-token') + with mock.patch.object(db, 'WorkspaceClient', lambda: ws_client): + yield UCObjectStore(path='Volumes/catalog/schema/volume/path/') + + +@pytest.mark.skip # TODO: setup databricks auth on github actions +@pytest.mark.remote +def test_uc_object_store_integration(): + model = SimpleModel() + train_dataset = RandomClassificationDataset() + train_dataloader = DataLoader(dataset=train_dataset) + trainer_save = Trainer(model=model, + train_dataloader=train_dataloader, + save_folder='dbfs:/Volumes/ml/mosaicml/test-volume/checkpoints/{run_name}', + save_filename='test-model.pt', + max_duration='1ba') + run_name = trainer_save.state.run_name + trainer_save.fit() + trainer_save.close() + + trainer_load = Trainer(model=model, + train_dataloader=train_dataloader, + load_path=f'dbfs:/Volumes/ml/mosaicml/test-volume/checkpoints/{run_name}/test-model.pt', + max_duration='2ba') + trainer_load.fit() + trainer_load.close() + + +def test_uc_object_store_without_env(): + with pytest.raises(ValueError): + UCObjectStore(path='Volumes/test-volume/') + + +def test_uc_object_store_invalid_prefix(): + with pytest.raises(ValueError): + UCObjectStore(path='root/') + with pytest.raises(ValueError): + UCObjectStore(path='uc://Volumes') + with pytest.raises(ValueError): + UCObjectStore(path='Volumes/catalog/schema/') + + +@pytest.mark.parametrize('result', ['success', 'not_found']) +def test_get_object_size(ws_client, uc_object_store, result: str): + if result == 'success': + db_files = pytest.importorskip('databricks.sdk.service.files') + ws_client.files.get_status.return_value = db_files.FileInfo(file_size=100) + assert uc_object_store.get_object_size('train.txt') == 100 + elif result == 'not_found': + db_core = pytest.importorskip('databricks.sdk.core', reason='requires databricks') + ws_client.files.get_status.side_effect = db_core.DatabricksError('The file being accessed is not found', + error_code='NOT_FOUND') + with pytest.raises(FileNotFoundError): + uc_object_store.get_object_size('train.txt') + else: + raise NotImplementedError(f'Test for result={result} is not implemented.') + + +def test_get_uri(uc_object_store): + assert uc_object_store.get_uri('train.txt') == 'dbfs:/Volumes/catalog/schema/volume/train.txt' + assert uc_object_store.get_uri('Volumes/catalog/schema/volume/checkpoint/model.bin' + ) == 'dbfs:/Volumes/catalog/schema/volume/checkpoint/model.bin' + + +def test_upload_object(ws_client, uc_object_store, tmp_path): + file_to_upload = str(tmp_path / Path('train.txt')) + with open(file_to_upload, 'wb') as f: + f.write(bytes(range(20))) + + uc_object_store.upload_object(object_name='train.txt', filename=file_to_upload) + ws_client.files.upload.assert_called_with('/Volumes/catalog/schema/volume/train.txt', ANY) + + +@pytest.mark.parametrize('result', ['success', 'file_exists', 'overwrite_file', 'not_found', 'error']) +def test_download_object(ws_client, uc_object_store, tmp_path, result: str): + + object_name = 'remote-model.bin' + file_content = bytes('0' * (100), 'utf-8') + file_to_download = str(tmp_path / Path('model.bin')) + + def generate_dummy_file(_): + db_files = pytest.importorskip('databricks.sdk.service.files') + with open(file_to_download, 'wb') as fp: + fp.write(file_content) + f = open(file_to_download, 'rb') + return db_files.DownloadResponse(contents=f) + + if result == 'success': + ws_client.files.download.side_effect = generate_dummy_file + uc_object_store.download_object(object_name, filename=file_to_download) + ws_client.files.download.assert_called_with('/Volumes/catalog/schema/volume/remote-model.bin') + + elif result == 'file_exists': + with open(file_to_download, 'wb') as fp: + fp.write(bytes('1' * (100), 'utf-8')) + with pytest.raises(FileExistsError): + uc_object_store.download_object(object_name, file_to_download) + + elif result == 'overwrite_file': + with open(file_to_download, 'wb') as fp: + fp.write(bytes('1' * (100), 'utf-8')) + ws_client.files.download.side_effect = generate_dummy_file + uc_object_store.download_object(object_name, file_to_download, overwrite=True) + ws_client.files.download.assert_called_with('/Volumes/catalog/schema/volume/remote-model.bin') + + # verify that the file was actually overwritten + with open(file_to_download, 'rb') as f: + actual_content = f.readline() + assert actual_content == file_content + + elif result == 'not_found': + db_core = pytest.importorskip('databricks.sdk.core', reason='requires databricks') + ws_client.files.download.side_effect = db_core.DatabricksError('The file being accessed is not found', + error_code='NOT_FOUND') + with pytest.raises(FileNotFoundError): + uc_object_store.download_object(object_name, file_to_download) + + elif result == 'error': + db_core = pytest.importorskip('databricks.sdk.core', reason='requires databricks') + ws_client.files.download.side_effect = db_core.DatabricksError + + with pytest.raises(ObjectStoreTransientError): + uc_object_store.download_object(object_name, file_to_download) + else: + raise NotImplementedError(f'Test for result={result} is not implemented.') + + +def test_uc_object_store_with_remote_ud(uc_object_store): + uri = 'dbfs:/Volumes/path/to/my/folder/' + rud = RemoteUploaderDownloader(bucket_uri=uri, backend_kwargs={'path': 'Volumes/catalog/schema/volume/path'}) + assert isinstance(rud.remote_backend, UCObjectStore) diff --git a/tests/utils/test_file_helpers.py b/tests/utils/test_file_helpers.py index 3a24388222..2e757afbe4 100644 --- a/tests/utils/test_file_helpers.py +++ b/tests/utils/test_file_helpers.py @@ -16,6 +16,7 @@ format_name_with_dist, format_name_with_dist_and_time, get_file, is_tar, maybe_create_object_store_from_uri, maybe_create_remote_uploader_downloader_from_uri, parse_uri) +from composer.utils.object_store import UCObjectStore from composer.utils.object_store.libcloud_object_store import LibcloudObjectStore from tests.common.markers import world_size from tests.loggers.test_remote_uploader_downloader import DummyObjectStore @@ -284,6 +285,10 @@ def test_maybe_create_object_store_from_uri(monkeypatch): monkeypatch.setattr(file_helpers, 'OCIObjectStore', mock_oci_obj) mock_gs_obj = MagicMock() monkeypatch.setattr(file_helpers, 'GCSObjectStore', mock_gs_obj) + mock_uc_obj = MagicMock() + # un-mock the static method that validates the path + mock_uc_obj.validate_path.side_effect = UCObjectStore.validate_path + monkeypatch.setattr(file_helpers, 'UCObjectStore', mock_uc_obj) assert maybe_create_object_store_from_uri('checkpoint/for/my/model.pt') is None @@ -302,6 +307,12 @@ def test_maybe_create_object_store_from_uri(monkeypatch): with pytest.raises(NotImplementedError): maybe_create_object_store_from_uri('ms://bucket/checkpoint/for/my/model.pt') + maybe_create_object_store_from_uri('dbfs:/Volumes/catalog/schema/volume/checkpoint/model.pt') + mock_uc_obj.assert_called_once_with(path='Volumes/catalog/schema/volume/checkpoint/model.pt') + + with pytest.raises(ValueError): + maybe_create_object_store_from_uri('dbfs:/checkpoint/for/my/model.pt') + def test_maybe_create_remote_uploader_downloader_from_uri(monkeypatch): assert maybe_create_remote_uploader_downloader_from_uri('checkpoint/for/my/model.pt', loggers=[]) is None @@ -338,6 +349,16 @@ def test_maybe_create_remote_uploader_downloader_from_uri(monkeypatch): with pytest.raises(NotImplementedError): maybe_create_remote_uploader_downloader_from_uri('ms://bucket/checkpoint/for/my/model.pt', loggers=[]) + with monkeypatch.context() as m: + mock_remote_ud = MagicMock() + m.setattr(loggers, 'RemoteUploaderDownloader', mock_remote_ud) + maybe_create_remote_uploader_downloader_from_uri('dbfs:/Volumes/checkpoint/for/my/model.pt', loggers=[]) + mock_remote_ud.assert_called_once_with(bucket_uri='dbfs:/Volumes/checkpoint/for/my/model.pt', + backend_kwargs={'path': 'Volumes/checkpoint/for/my/model.pt'}) + + with pytest.raises(ValueError): + maybe_create_remote_uploader_downloader_from_uri('dbfs:/checkpoint/for/my/model.pt', loggers=[]) + def test_ensure_folder_is_empty(tmp_path: pathlib.Path): ensure_folder_is_empty(tmp_path)