Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Switch file reading to use concrete endpoints #48

Merged
merged 16 commits into from
Jun 4, 2024
Merged
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 86 additions & 7 deletions src/fsspec_xrootd/xrootd.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

Check warning on line 1 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing module docstring

import asyncio
import io
import logging
import os.path
import warnings
from collections import defaultdict
Expand All @@ -11,18 +12,18 @@

from fsspec.asyn import AsyncFileSystem, _run_coros_in_chunks, sync_wrapper
from fsspec.spec import AbstractBufferedFile
from XRootD import client

Check failure on line 15 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD'
from XRootD.client.flags import (

Check failure on line 16 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD.client.flags'
DirListFlags,
MkDirFlags,
OpenFlags,
QueryCode,
StatInfoFlags,
)
from XRootD.client.responses import HostList, XRootDStatus

Check failure on line 23 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD.client.responses'


class ErrorCodes(IntEnum):

Check warning on line 26 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing class docstring
INVALID_PATH = 400


Expand All @@ -30,7 +31,7 @@
future: asyncio.Future[tuple[XRootDStatus, Any]],
status: XRootDStatus,
content: Any,
servers: HostList,

Check warning on line 34 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unused argument 'servers'
) -> None:
"""Sets result of _async_wrap() future.

Expand All @@ -49,7 +50,7 @@
return
try:
future.get_loop().call_soon_threadsafe(future.set_result, (status, content))
except Exception as exc:

Check warning on line 53 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Catching too general exception Exception
future.get_loop().call_soon_threadsafe(future.set_exception, exc)


Expand Down Expand Up @@ -139,7 +140,7 @@
return deets


class XRootDFileSystem(AsyncFileSystem): # type: ignore[misc]

Check warning on line 143 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing class docstring

Check warning on line 143 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Method '_cp_file' is abstract in class 'AsyncFileSystem' but is not overridden in child class 'XRootDFileSystem'

Check warning on line 143 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Method '_pipe_file' is abstract in class 'AsyncFileSystem' but is not overridden in child class 'XRootDFileSystem'

Check warning on line 143 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Method '_put_file' is abstract in class 'AsyncFileSystem' but is not overridden in child class 'XRootDFileSystem'

Check warning on line 143 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Method 'cp_file' is abstract in class 'AbstractFileSystem' but is not overridden in child class 'XRootDFileSystem'

Check warning on line 143 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Method 'created' is abstract in class 'AbstractFileSystem' but is not overridden in child class 'XRootDFileSystem'
protocol = "root"
root_marker = "/"
default_timeout = 60
Expand Down Expand Up @@ -168,10 +169,21 @@
If true, synchronous methods will not be available in this instance
loop:
Bring your own loop (for sync methods)
storage_options:
nsmith- marked this conversation as resolved.
Show resolved Hide resolved
Options for the XRootD file system object. Includes (not limited to):
- locate_all_sources = True: bool
- Defaults to True. Finds all locations at which the file is hosted, and chooses
nsmith- marked this conversation as resolved.
Show resolved Hide resolved
from those. Does not let the redirector pick the first to respond.
- valid_sources = []: list
- If given and locate_all_sources is True, fsspec will only reject any file host
not in this list. Entries should be of the form ie: `cmsxrootd-site1.fnal.gov`
(no port number)
"""
super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options)
self.timeout = storage_options.get("timeout", XRootDFileSystem.default_timeout)
self.hostid = hostid
self.locate_all_sources = storage_options.get("locate_all_sources", True)
self.valid_sources = storage_options.get("valid_sources", [])
self._myclient = client.FileSystem("root://" + hostid)
if not self._myclient.url.is_valid():
raise ValueError(f"Invalid hostid: {hostid!r}")
Expand Down Expand Up @@ -702,23 +714,37 @@
if not isinstance(path, str):
raise ValueError(f"Path expected to be string, path: {path}")

self._myFile = client.File()
status, _n = self._myFile.open(
fs.unstrip_protocol(path),
self.mode,
timeout=self.timeout,
)
self.fs = fs

if "r" in mode and self.fs.locate_all_sources:
self._hosts = self._locate_sources(path)
else:
self._hosts = [fs.storage_options["hostid"]]

# Try hosts until you find an openable file
for _i_host in range(len(self._hosts)):
self._myFile = client.File()
status, _n = self._myFile.open(
fs.unstrip_protocol(path),
nsmith- marked this conversation as resolved.
Show resolved Hide resolved
self.mode,
timeout=self.timeout,
)
if status.ok:
break

if not status.ok:
raise OSError(f"File did not open properly: {status.message}")

# Move hosts that tried and failed to self._dismissed_hosts
self._dismissed_hosts = self._hosts[:_i_host]
self._hosts = self._hosts[_i_host:]

self.metaOffset = 0
if "a" in mode:
_stats, _deets = self._myFile.stat(timeout=self.timeout)
self.metaOffset = _deets.size

self.path = path
self.fs = fs
self.mode = mode
self.blocksize = (
self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size
Expand Down Expand Up @@ -760,6 +786,59 @@
self.location = None
self.offset = 0

def _locate_sources(self, logical_filename: str) -> list[str]:
"""Find hosts that have the desired file.

Gets a list of hosts from the XRootD server that was provided when the
XRootDFile object was instantiated. Note that this implies it will only find
more hosts of the given file if self.fs is a redirector. Implementation of a
solution from the Pepper project in this issue:

(https://github.com/CoffeaTeam/fsspec-xrootd/issues/36).

If valid_sources is a non-empty list in fs.storage_options, will only return domain names
that are also in valid_sources

Parameters
----------
logical_filename: The logical filename of the file. (ex: "//store/mc/other/stuff/file.root")

Returns
-------
List of domain names that host the file
"""
myclient = self.fs._myclient
# From Pepper:
# The flag PrefName (to get domain names instead of IP addresses) does
# not exist in the Python bindings. However, MAKEPATH has the same value
status, loc = myclient.locate(logical_filename, client.flags.OpenFlags.MAKEPATH)
if loc is None:
raise OSError("XRootD error: " + status.message)
hosts = []
for r in loc:
if len(r.address.split(":")) > 1:
# Strip off the port number if necessary
clean_address = "".join(r.address.split(":")[:-1])
else:
clean_address = r.address
if (clean_address in self.fs.valid_sources) or (
len(self.fs.valid_sources) == 0
):
hosts.append(clean_address)
logging.debug(f"Added host {clean_address} to _hosts")
else:
logging.debug(
f"Host {clean_address} not in valid_sources {self.fs.valid_sources}"
)
if len(hosts) == 0:
err_msg = f"XRootD error: No hosts for file {logical_filename} found using XRootD server {self.fs.storage_options['hostid']}"
if len(self.fs.valid_sources) > 0:
vld_src_msg = f" and valid sources {self.fs.valid_sources}"
raise OSError(err_msg + vld_src_msg)
else:
raise OSError(err_msg)
return hosts

def _fetch_range(self, start: int, end: int) -> Any:
status, data = self._myFile.read(
self.metaOffset + start, self.metaOffset + end - start, timeout=self.timeout
Expand Down
Loading