Skip to content

Commit

Permalink
Fix fsspec multithreading clobbering issue (#898)
Browse files Browse the repository at this point in the history
Closes #892

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored May 5, 2023
1 parent 1a66f58 commit 4d8c4ea
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions daft/udf_library/url_udfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,32 @@ def _download(path: str | None) -> bytes | None:
return fs.cat_file(path)


def _warmup_fsspec_registry(urls_pylist: list[str | None]) -> None:
"""HACK: filesystem.get_filesystem calls fsspec.get_filesystem_class under the hood, which throws an error
if accessed concurrently for the first time. We "warm" it up in a single-threaded fashion here
This should be fixed in the next release of FSSpec
See: https://github.com/Eventual-Inc/Daft/issues/892
"""
import fsspec

protocols = {filesystem.get_protocol_from_path(url) for url in urls_pylist if url is not None}
for protocol in protocols:
fsspec.get_filesystem_class(protocol)


@udf(return_dtype=DataType.binary())
def download_udf(urls, max_worker_threads: int = 8):
"""Downloads the contents of the supplied URLs."""

from loguru import logger

results: list[bytes | None] = []
urls_pylist = urls.to_arrow().to_pylist()

_warmup_fsspec_registry(urls_pylist)

executor = ThreadPoolExecutor(max_workers=max_worker_threads, initializer=_worker_thread_initializer)
results = [None for _ in range(len(urls))]
urls_pylist = urls.to_arrow().to_pylist()
results: list[bytes | None] = [None for _ in range(len(urls))]
future_to_idx = {executor.submit(_download, urls_pylist[i]): i for i in range(len(urls))}
for future in as_completed(future_to_idx):
try:
Expand Down

0 comments on commit 4d8c4ea

Please sign in to comment.