diff --git a/src/fsspec_xrootd/xrootd.py b/src/fsspec_xrootd/xrootd.py index da2b7f7..4090d3f 100644 --- a/src/fsspec_xrootd/xrootd.py +++ b/src/fsspec_xrootd/xrootd.py @@ -2,6 +2,7 @@ import asyncio import io +import logging import os.path import time import warnings @@ -234,6 +235,8 @@ def __init__( hostid: str, asynchronous: bool = False, loop: asyncio.AbstractEventLoop | None = None, + locate_all_sources: bool = True, + valid_sources: list[str] | None = None, **storage_options: Any, ) -> None: """ @@ -248,10 +251,23 @@ def __init__( If true, synchronous methods will not be available in this instance loop: Bring your own loop (for sync methods) + locate_all_sources = True: bool + Only active for reading (does nothing for writing). Defaults to True. + Finds all locations at which the file is hosted, and chooses from those. Does + not let the redirector pick the first to respond. + valid_sources = None: list + If given and locate_all_sources is True, fsspec will only reject any file host + not in this list. Entries should be of the form ie: `cmsxrootd-site1.fnal.gov` + (no port number) """ super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options) self.timeout = storage_options.get("timeout", XRootDFileSystem.default_timeout) self.hostid = hostid + self.locate_all_sources = locate_all_sources + if valid_sources: + self.valid_sources = valid_sources + else: + self.valid_sources = [] self._myclient = client.FileSystem("root://" + hostid) if not self._myclient.url.is_valid(): raise ValueError(f"Invalid hostid: {hostid!r}") @@ -756,14 +772,34 @@ def __init__( if not isinstance(path, str): raise ValueError(f"Path expected to be string, path: {path}") + self.fs = fs # Ensure any read-only handle is closed fs.invalidate_cache(path) + + # Try opening with given pathname before trying to locate all sources (if requested) self._myFile = client.File() - status, _ = self._myFile.open( + status, _n = self._myFile.open( fs.unstrip_protocol(path), self.mode, timeout=self.timeout, ) + if not status.ok and "r" in mode and self.fs.locate_all_sources: + self._hosts = self._locate_sources(path) + # Try hosts until you find an openable file + for i_host in range(len(self._hosts)): + self._myFile = client.File() + status, _n = self._myFile.open( + fs.protocol + "://" + self._hosts[i_host] + "/" + path, + self.mode, + timeout=self.timeout, + ) + if status.ok: + # Move hosts that tried and failed to self._dismissed_hosts + self._dismissed_hosts = self._hosts[:i_host] + self._hosts = self._hosts[i_host:] + break + # If above loop cannot find source OR locate_all_sources is off and we + # could not read file initially, end up here if not status.ok: raise OSError(f"File did not open properly: {status.message}") @@ -773,7 +809,6 @@ def __init__( self.metaOffset = _deets.size self.path = path - self.fs = fs self.mode = mode self.blocksize = ( self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size @@ -815,6 +850,59 @@ def __init__( self.location = None self.offset = 0 + def _locate_sources(self, logical_filename: str) -> list[str]: + """Find hosts that have the desired file. + + Gets a list of hosts from the XRootD server that was provided when the + XRootDFile object was instantiated. Note that this implies it will only find + more hosts of the given file if self.fs is a redirector. Implementation of a + solution from the Pepper project in this issue: + + (https://github.com/CoffeaTeam/fsspec-xrootd/issues/36). + + If valid_sources is a non-empty list in fs.storage_options, will only return domain names + that are also in valid_sources + + Parameters + ---------- + logical_filename: The logical filename of the file. (ex: "//store/mc/other/stuff/file.root") + + Returns + ------- + List of domain names that host the file + """ + myclient = self.fs._myclient + # From Pepper: + # The flag PrefName (to get domain names instead of IP addresses) does + # not exist in the Python bindings. However, MAKEPATH has the same value + status, loc = myclient.locate(logical_filename, client.flags.OpenFlags.MAKEPATH) + if loc is None: + raise OSError("XRootD error: " + status.message) + hosts = [] + for r in loc: + if len(r.address.split(":")) > 1: + # Strip off the port number if necessary + clean_address = "".join(r.address.split(":")[:-1]) + else: + clean_address = r.address + if (clean_address in self.fs.valid_sources) or ( + len(self.fs.valid_sources) == 0 + ): + hosts.append(clean_address) + logging.debug(f"Added host {clean_address} to _hosts") + else: + logging.debug( + f"Host {clean_address} not in valid_sources {self.fs.valid_sources}" + ) + if len(hosts) == 0: + err_msg = f"XRootD error: No hosts for file {logical_filename} found using XRootD server {self.fs.storage_options['hostid']}" + if len(self.fs.valid_sources) > 0: + vld_src_msg = f" and valid sources {self.fs.valid_sources}" + raise OSError(err_msg + vld_src_msg) + else: + raise OSError(err_msg) + return hosts + def _fetch_range(self, start: int, end: int) -> Any: status, data = self._myFile.read( self.metaOffset + start, self.metaOffset + end - start, timeout=self.timeout