Skip to content

Commit

Permalink
Merge branch 'main' into kerchunk_engine_opener
Browse files Browse the repository at this point in the history
  • Loading branch information
norlandrhagen committed Jun 18, 2024
2 parents 6291fc1 + 5cee848 commit c31783d
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request'
uses: codecov/codecov-action@v4.3.1
uses: codecov/codecov-action@v4.5.0
with:
file: ./coverage.xml
env_vars: OS,PYTHON
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ repos:
args: ["--line-length", "100"]

- repo: https://github.com/PyCQA/flake8
rev: 7.0.0
rev: 7.1.0
hooks:
- id: flake8
exclude: pangeo_forge_recipes/recipes
Expand Down
4 changes: 2 additions & 2 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pangeo-sphinx-book-theme==0.2
myst-nb==1.1.0
sphinx-copybutton==0.5.2
sphinx-togglebutton==0.3.2
sphinx-autodoc-typehints==2.1.0
sphinx-autodoc-typehints==2.1.1
sphinxext-opengraph==0.9.1
sphinx-design==0.5.0
sphinx-design==0.6.0
-e .
5 changes: 3 additions & 2 deletions pangeo_forge_recipes/openers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def open_url(
cache: Optional[CacheFSSpecTarget] = None,
secrets: Optional[Dict] = None,
open_kwargs: Optional[Dict] = None,
fsspec_sync_patch: bool = False,
) -> OpenFileType:
"""Open a string-based URL with fsspec.
Expand All @@ -29,10 +30,10 @@ def open_url(
kw = open_kwargs or {}
if cache is not None:
# this has side effects
cache.cache_file(url, secrets, **kw)
cache.cache_file(url, secrets, fsspec_sync_patch, **kw)
open_file = cache.open_file(url, mode="rb")
else:
open_file = _get_opener(url, secrets, **kw)
open_file = _get_opener(url, secrets, fsspec_sync_patch, **kw)
return open_file


Expand Down
25 changes: 15 additions & 10 deletions pangeo_forge_recipes/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@
OpenFileType = Union[fsspec.core.OpenFile, fsspec.spec.AbstractBufferedFile, io.IOBase]


def _get_url_size(fname, secrets, **open_kwargs):
with _get_opener(fname, secrets, **open_kwargs) as of:
size = of.size
return size


def _copy_btw_filesystems(input_opener, output_opener, BLOCK_SIZE=10_000_000):
with input_opener as source:
with output_opener as target:
Expand Down Expand Up @@ -192,18 +186,22 @@ def _full_path(self, path: str) -> str:
class CacheFSSpecTarget(FlatFSSpecTarget):
"""Alias for FlatFSSpecTarget"""

def cache_file(self, fname: str, secrets: Optional[dict], **open_kwargs) -> None:
def cache_file(
self, fname: str, secrets: Optional[dict], fsspec_sync_patch=False, **open_kwargs
) -> None:
# check and see if the file already exists in the cache
logger.info(f"Caching file '{fname}'")
input_opener = _get_opener(fname, secrets, fsspec_sync_patch, **open_kwargs)

if self.exists(fname):
cached_size = self.size(fname)
remote_size = _get_url_size(fname, secrets, **open_kwargs)
with input_opener as of:
remote_size = of.size
if cached_size == remote_size:
# TODO: add checksumming here
logger.info(f"File '{fname}' is already cached")
return

input_opener = _get_opener(fname, secrets, **open_kwargs)
target_opener = self.open(fname, mode="wb")
logger.info(f"Copying remote file '{fname}' to cache")
_copy_btw_filesystems(input_opener, target_opener)
Expand All @@ -228,7 +226,14 @@ def _add_query_string_secrets(fname: str, secrets: dict) -> str:
return urlunparse(parsed)


def _get_opener(fname, secrets, **open_kwargs):
def _get_opener(fname, secrets, fsspec_sync_patch, **open_kwargs):
if fsspec_sync_patch:
logger.debug("Attempting to enable synchronous filesystem implementations in FSSpec")
from httpfs_sync.core import SyncHTTPFileSystem

SyncHTTPFileSystem.overwrite_async_registration()
logger.debug("Synchronous HTTP implementation enabled.")

fname = fname if not secrets else _add_query_string_secrets(fname, secrets)
return fsspec.open(fname, mode="rb", **open_kwargs)

Expand Down
6 changes: 5 additions & 1 deletion pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,16 @@ class OpenURLWithFSSpec(beam.PTransform):
:param secrets: If provided these secrets will be injected into the URL as a query string.
:param open_kwargs: Extra arguments passed to fsspec.open.
:param max_concurrency: Max concurrency for this transform.
:param fsspec_sync_patch: Experimental. Likely slower. When enabled, this attempts to
replace asynchronous code with synchronous implementations to potentially address
deadlocking issues. cf. https://github.com/h5py/h5py/issues/2019
"""

cache: Optional[str | CacheFSSpecTarget] = None
secrets: Optional[dict] = None
open_kwargs: Optional[dict] = None
max_concurrency: Optional[int] = None
fsspec_sync_patch: bool = False

def expand(self, pcoll):
if isinstance(self.cache, str):
Expand All @@ -161,6 +165,7 @@ def expand(self, pcoll):
kws = dict(
cache=cache,
secrets=self.secrets,
fsspec_sync_patch=self.fsspec_sync_patch,
open_kwargs=self.open_kwargs,
)
return pcoll | MapWithConcurrencyLimit(
Expand Down Expand Up @@ -699,7 +704,6 @@ def expand(
| beam.Map(self.dynamic_chunking_fn, **self.dynamic_chunking_fn_kwargs)
)
)
logger.info(f"Storing Zarr with {target_chunks=} to {self.get_full_target()}")
rechunked_datasets = indexed_datasets | Rechunk(target_chunks=target_chunks, schema=schema)
target_store = schema | PrepareZarrTarget(
target=self.get_full_target(),
Expand Down

0 comments on commit c31783d

Please sign in to comment.