Skip to content

Commit

Permalink
New storage APIs (#536)
Browse files Browse the repository at this point in the history
* New storage APIs.

* Potentially fix import issue.

* Fix (path).

* Fix (paths).

* Fix (paths).
  • Loading branch information
knighton committed Dec 15, 2023
1 parent 78c150e commit 02bd910
Show file tree
Hide file tree
Showing 4 changed files with 382 additions and 38 deletions.
10 changes: 8 additions & 2 deletions streaming/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
download_from_azure_datalake,
download_from_databricks_unity_catalog, download_from_dbfs,
download_from_gcs, download_from_local, download_from_oci,
download_from_s3, download_from_sftp,
wait_for_file_to_exist)
download_from_s3, download_from_sftp)
from streaming.storage.extra import (file_exists, list_dataset_files, smart_download_file,
wait_for_file_to_exist, walk_dir, walk_prefix)
from streaming.storage.upload import (AzureDataLakeUploader, AzureUploader, CloudUploader,
GCSUploader, LocalUploader, OCIUploader, S3Uploader)

Expand All @@ -31,4 +32,9 @@
'download_from_dbfs',
'download_from_local',
'wait_for_file_to_exist',
'walk_prefix',
'walk_dir',
'list_dataset_files',
'smart_download_file',
'file_exists',
]
28 changes: 0 additions & 28 deletions streaming/storage/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import pathlib
import shutil
import urllib.parse
from time import sleep, time
from typing import Any, Dict, Optional

from streaming.util import get_import_exception_message
Expand All @@ -22,7 +21,6 @@
'download_from_databricks_unity_catalog',
'download_from_dbfs',
'download_from_local',
'wait_for_file_to_exist',
]

BOTOCORE_CLIENT_ERROR_CODES = {'403', '404', 'NoSuchKey'}
Expand Down Expand Up @@ -473,29 +471,3 @@ def download_file(remote: Optional[str], local: str, timeout: float):
download_from_dbfs(remote, local)
else:
download_from_local(remote, local)


def wait_for_file_to_exist(filename: str, poll_interval: float, timeout: float,
err_msg: str) -> None:
"""Wait for the file to exist till timeout seconds. Raise an Exception after that.
Args:
filename (str): A file name
poll_interval (float): Number of seconds to wait before next polling
timeout (float): Number of seconds to wait for a file to exist before raising an exception
err_msg (str): Error message description for an exception
Raises:
RuntimeError: Raise an Exception if file does not exist after timeout
"""
start_time = time()
while True:
sleep(poll_interval)
if os.path.exists(filename):
sleep(poll_interval)
break
dt = time() - start_time
if dt > timeout:
raise RuntimeError(
f'{err_msg} due to timeout. Waited {dt:.3f} sec, which is longer than the ' +
f'timeout limit of {timeout:.3f} sec.')
Loading

0 comments on commit 02bd910

Please sign in to comment.