Skip to content

Commit

Permalink
use distinct filesystem instances in pipeline threads
Browse files Browse the repository at this point in the history
- objects yielded by @dlt.resource can run in different threads
- note this does not address possible bug with filesystem instances being cached by fsspec itself.
  • Loading branch information
Your Name committed Feb 16, 2024
1 parent fbbec91 commit 229f7a5
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions sources/filesystem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import dlt
from dlt.common.typing import copy_sig, DictStrAny
from dlt.sources import DltResource
from dlt.sources.filesystem import FileItem, FileItemDict, fsspec_filesystem, glob_files
from dlt.sources.filesystem import FileItem, FileItemDict, glob_files
from dlt.sources.credentials import FileSystemCredentials
from dlt.common.storages.fsspec_filesystem import fsspec_from_config
from dlt.common.storages.configuration import FilesystemConfiguration


from .helpers import (
AbstractFileSystem,
Expand Down Expand Up @@ -90,14 +93,15 @@ def filesystem(
Returns:
Iterator[List[FileItem]]: The list of files.
"""
fs_config = FilesystemConfiguration(bucket_url, credentials, kwargs=kwargs)
if isinstance(credentials, AbstractFileSystem):
fs_client = credentials
else:
fs_client = fsspec_filesystem(bucket_url, credentials, kwargs=kwargs)[0]
fs_client = fsspec_from_config(fs_config)[0]

files_chunk: List[FileItem] = []
for file_model in glob_files(fs_client, bucket_url, file_glob):
file_dict = FileItemDict(file_model, fs_client)
file_dict = FileItemDict(file_model, fs_config)
if extract_content:
file_dict["file_content"] = file_dict.read_bytes()
files_chunk.append(file_dict) # type: ignore
Expand Down

0 comments on commit 229f7a5

Please sign in to comment.