Skip to content

Commit

Permalink
feat: Switch file reading to use concrete endpoints (#48)
Browse files Browse the repository at this point in the history
* Initial attempt: Find endpoint for open, no retrying on read failure

* Moved definition of fs attribute in File class

* Fix bug so new files can be written

* Make locate_all_files optional

* Fix valid_sources bug that never added sources

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* change prints to logging.debug

* Formatting and precommit fixes

* Clean up imports

* Put logging.debug back in

* Address early May PR changes

* Only try to locate all sources if given path does not successfully open file first

---------

Co-authored-by: Ryan Simeon <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Lindsey Gray <[email protected]>
  • Loading branch information
4 people authored Jun 4, 2024
1 parent 040bcbf commit 784540c
Showing 1 changed file with 90 additions and 2 deletions.
92 changes: 90 additions & 2 deletions src/fsspec_xrootd/xrootd.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import io
import logging
import os.path
import time
import warnings
Expand Down Expand Up @@ -234,6 +235,8 @@ def __init__(
hostid: str,
asynchronous: bool = False,
loop: asyncio.AbstractEventLoop | None = None,
locate_all_sources: bool = True,
valid_sources: list[str] | None = None,
**storage_options: Any,
) -> None:
"""
Expand All @@ -248,10 +251,23 @@ def __init__(
If true, synchronous methods will not be available in this instance
loop:
Bring your own loop (for sync methods)
locate_all_sources = True: bool
Only active for reading (does nothing for writing). Defaults to True.
Finds all locations at which the file is hosted, and chooses from those. Does
not let the redirector pick the first to respond.
valid_sources = None: list
If given and locate_all_sources is True, fsspec will only reject any file host
not in this list. Entries should be of the form ie: `cmsxrootd-site1.fnal.gov`
(no port number)
"""
super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options)
self.timeout = storage_options.get("timeout", XRootDFileSystem.default_timeout)
self.hostid = hostid
self.locate_all_sources = locate_all_sources
if valid_sources:
self.valid_sources = valid_sources
else:
self.valid_sources = []
self._myclient = client.FileSystem("root://" + hostid)
if not self._myclient.url.is_valid():
raise ValueError(f"Invalid hostid: {hostid!r}")
Expand Down Expand Up @@ -756,14 +772,34 @@ def __init__(
if not isinstance(path, str):
raise ValueError(f"Path expected to be string, path: {path}")

self.fs = fs
# Ensure any read-only handle is closed
fs.invalidate_cache(path)

# Try opening with given pathname before trying to locate all sources (if requested)
self._myFile = client.File()
status, _ = self._myFile.open(
status, _n = self._myFile.open(
fs.unstrip_protocol(path),
self.mode,
timeout=self.timeout,
)
if not status.ok and "r" in mode and self.fs.locate_all_sources:
self._hosts = self._locate_sources(path)
# Try hosts until you find an openable file
for i_host in range(len(self._hosts)):
self._myFile = client.File()
status, _n = self._myFile.open(
fs.protocol + "://" + self._hosts[i_host] + "/" + path,
self.mode,
timeout=self.timeout,
)
if status.ok:
# Move hosts that tried and failed to self._dismissed_hosts
self._dismissed_hosts = self._hosts[:i_host]
self._hosts = self._hosts[i_host:]
break
# If above loop cannot find source OR locate_all_sources is off and we
# could not read file initially, end up here
if not status.ok:
raise OSError(f"File did not open properly: {status.message}")

Expand All @@ -773,7 +809,6 @@ def __init__(
self.metaOffset = _deets.size

self.path = path
self.fs = fs
self.mode = mode
self.blocksize = (
self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size
Expand Down Expand Up @@ -815,6 +850,59 @@ def __init__(
self.location = None
self.offset = 0

def _locate_sources(self, logical_filename: str) -> list[str]:
"""Find hosts that have the desired file.
Gets a list of hosts from the XRootD server that was provided when the
XRootDFile object was instantiated. Note that this implies it will only find
more hosts of the given file if self.fs is a redirector. Implementation of a
solution from the Pepper project in this issue:
(https://github.com/CoffeaTeam/fsspec-xrootd/issues/36).
If valid_sources is a non-empty list in fs.storage_options, will only return domain names
that are also in valid_sources
Parameters
----------
logical_filename: The logical filename of the file. (ex: "//store/mc/other/stuff/file.root")
Returns
-------
List of domain names that host the file
"""
myclient = self.fs._myclient
# From Pepper:
# The flag PrefName (to get domain names instead of IP addresses) does
# not exist in the Python bindings. However, MAKEPATH has the same value
status, loc = myclient.locate(logical_filename, client.flags.OpenFlags.MAKEPATH)
if loc is None:
raise OSError("XRootD error: " + status.message)
hosts = []
for r in loc:
if len(r.address.split(":")) > 1:
# Strip off the port number if necessary
clean_address = "".join(r.address.split(":")[:-1])
else:
clean_address = r.address
if (clean_address in self.fs.valid_sources) or (
len(self.fs.valid_sources) == 0
):
hosts.append(clean_address)
logging.debug(f"Added host {clean_address} to _hosts")
else:
logging.debug(
f"Host {clean_address} not in valid_sources {self.fs.valid_sources}"
)
if len(hosts) == 0:
err_msg = f"XRootD error: No hosts for file {logical_filename} found using XRootD server {self.fs.storage_options['hostid']}"
if len(self.fs.valid_sources) > 0:
vld_src_msg = f" and valid sources {self.fs.valid_sources}"
raise OSError(err_msg + vld_src_msg)
else:
raise OSError(err_msg)
return hosts

def _fetch_range(self, start: int, end: int) -> Any:
status, data = self._myFile.read(
self.metaOffset + start, self.metaOffset + end - start, timeout=self.timeout
Expand Down

0 comments on commit 784540c

Please sign in to comment.