diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index b1cbc11bf9..5e003c66f6 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -20,7 +20,8 @@ ) from urllib.parse import urlparse -from fsspec import AbstractFileSystem, register_implementation +from fsspec.registry import known_implementations, register_implementation +from fsspec import AbstractFileSystem from fsspec.core import url_to_fs from dlt import version @@ -76,6 +77,52 @@ class FileItem(TypedDict, total=False): "azure": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(), } +CUSTOM_IMPLEMENTATIONS = { + "dummyfs": { + "fq_classname": "dummyfs.DummyFileSystem", + "errtxt": "Dummy only", + }, + "gdrive": { + "fq_classname": "dlt.common.storages.fsspecs.google_drive.GoogleDriveFileSystem", + "errtxt": "Please install gdrivefs to access GoogleDriveFileSystem", + }, + "gitpythonfs": { + "fq_classname": "dlt.common.storages.implementations.gitpythonfs.GitPythonFileSystem", + "errtxt": "Please install gitpythonfs to access GitPythonFileSystem", + }, +} + + +def register_implementation_in_fsspec(protocol: str) -> None: + """Dynamically register a filesystem implementation with fsspec. + + This is useful if the implementation is not officially known in the fsspec codebase. + + The registration's scope is the current process. + + Is a no-op if an implementation is already registerd for the given protocol. + + Args: + protocol (str): The protocol to register. + + Returns: None + """ + if protocol in known_implementations: + return + + if not protocol in CUSTOM_IMPLEMENTATIONS: + raise ValueError( + f"Unknown protocol: '{protocol}' is not an fsspec known " + "implementations nor a dlt custom implementations." + ) + + registration_details = CUSTOM_IMPLEMENTATIONS[protocol] + register_implementation( + protocol, + registration_details["fq_classname"], + errtxt=registration_details["errtxt"], + ) + def fsspec_filesystem( protocol: str, @@ -112,11 +159,6 @@ def prepare_fsspec_args(config: FilesystemConfiguration) -> DictStrAny: fs_kwargs: DictStrAny = {"use_listings_cache": False, "listings_expiry_time": 60.0} credentials = CREDENTIALS_DISPATCH.get(protocol, lambda _: {})(config) - if protocol == "gdrive": - from dlt.common.storages.fsspecs.google_drive import GoogleDriveFileSystem - - register_implementation("gdrive", GoogleDriveFileSystem, "GoogleDriveFileSystem") - if config.kwargs is not None: fs_kwargs.update(config.kwargs) if config.client_kwargs is not None: @@ -142,6 +184,7 @@ def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSys Returns: (fsspec filesystem, normalized url) """ fs_kwargs = prepare_fsspec_args(config) + register_implementation_in_fsspec(config.protocol) try: return url_to_fs(config.bucket_url, **fs_kwargs) # type: ignore diff --git a/dlt/common/storages/implementations/gitpythonfs.py b/dlt/common/storages/implementations/gitpythonfs.py index 2ea85e03da..fceb9af3b0 100644 --- a/dlt/common/storages/implementations/gitpythonfs.py +++ b/dlt/common/storages/implementations/gitpythonfs.py @@ -13,7 +13,7 @@ def register_implementation_in_fsspec() -> None: This is needed if the implementation is not officially registered in the fsspec codebase. It will also override ("clobber") an existing implementation having the same protocol. - The registration is only valid for the current process. + The registration's scope is the current process. """ register_implementation( "gitpythonfs", diff --git a/tests/common/storages/implementations/conftest.py b/tests/common/storages/implementations/conftest.py index b99135d8b5..df2cecce47 100644 --- a/tests/common/storages/implementations/conftest.py +++ b/tests/common/storages/implementations/conftest.py @@ -1,4 +1,4 @@ def pytest_configure(config): config.addinivalue_line( - "markers", "skip_fsspec_registration: marks test to not use fsspec registration fixture" + "markers", "no_registration_fixture: marks test to not use fsspec registration fixture" ) diff --git a/tests/common/storages/implementations/test_gitpythonfs.py b/tests/common/storages/implementations/test_gitpythonfs.py index 68e6414235..a1f05a07ef 100644 --- a/tests/common/storages/implementations/test_gitpythonfs.py +++ b/tests/common/storages/implementations/test_gitpythonfs.py @@ -8,10 +8,10 @@ import fsspec from fsspec.registry import ( - get_filesystem_class, known_implementations, available_protocols, filesystem, + register_implementation, ) from git import Repo, BadName @@ -27,19 +27,21 @@ test_fs_kwargs = {"skip_instance_cache": True} -@pytest.fixture(scope="module", autouse=True) +@pytest.fixture(autouse=True) def ensure_fs_registered(request) -> Iterator[None]: """Ensure that the gitpythonfs implementation is registered in fsspec""" - if "skip_fsspec_registration" in request.keywords: - pytest.skip("Skipping fsspec registration for marked tests") - try: - if PROTOCOL in known_implementations: - known_implementations.pop(PROTOCOL) - register_implementation_in_fsspec() + if "no_registration_fixture" in request.keywords: + # skip registration for tests marked @pytest.mark.no_registration_fixture yield None - finally: - if PROTOCOL in known_implementations: - known_implementations.pop(PROTOCOL) + else: + try: + if PROTOCOL in known_implementations: + known_implementations.pop(PROTOCOL) + register_implementation_in_fsspec() + yield None + finally: + if PROTOCOL in known_implementations: + known_implementations.pop(PROTOCOL) @pytest.fixture() @@ -89,23 +91,35 @@ def repo_fixture() -> Iterator[Any]: shutil.rmtree(repo_path) -@pytest.mark.skip_fsspec_registration +@pytest.mark.no_registration_fixture def test_register_implementation_in_fsspec() -> None: - """Test registering a filesystem with fsspec.""" + """Test registering a filesystem implementation with fsspec. + + Takes care with state since other tests may be expecting certain + implementations to be registered. + """ + previous_registration_existed = False + + # setup if PROTOCOL in known_implementations: - known_implementations.pop(PROTOCOL) + backup = known_implementations.pop(PROTOCOL) + previous_registration_existed = True assert ( PROTOCOL not in known_implementations ), f"As a test precondition, {PROTOCOL} should not be registered." + # do and test register_implementation_in_fsspec() assert PROTOCOL in available_protocols(), f"{PROTOCOL} should be registered." - cls = get_filesystem_class(PROTOCOL) - assert cls == GitPythonFileSystem - - if PROTOCOL in known_implementations: + # teardown + if previous_registration_existed: + register_implementation(PROTOCOL, backup, clobber=True) + assert ( + PROTOCOL in available_protocols() + ), f"After teardown, {PROTOCOL} should not be registered, which was the original state." + else: known_implementations.pop(PROTOCOL) assert ( PROTOCOL not in known_implementations diff --git a/tests/load/filesystem/test_filesystem_common.py b/tests/load/filesystem/test_filesystem_common.py index 4d370fc786..1987d1274d 100644 --- a/tests/load/filesystem/test_filesystem_common.py +++ b/tests/load/filesystem/test_filesystem_common.py @@ -11,7 +11,11 @@ from dlt.common.configuration.inject import with_config from dlt.common.configuration.specs import AzureCredentials, AzureCredentialsWithoutDefaults from dlt.common.storages import fsspec_from_config, FilesystemConfiguration -from dlt.common.storages.fsspec_filesystem import MTIME_DISPATCH, glob_files +from dlt.common.storages.fsspec_filesystem import ( + register_implementation_in_fsspec, + MTIME_DISPATCH, + glob_files, +) from dlt.common.utils import uniq_id from tests.common.storages.utils import assert_sample_files from tests.load.utils import ALL_FILESYSTEM_DRIVERS, AWS_BUCKET @@ -42,6 +46,46 @@ def test_filesystem_configuration() -> None: } +def test_register_implementation_in_fsspec() -> None: + """Test registering a filesystem implementation with fsspec.""" + # ToDo make test more focused and safe with mock. Just want a unit test. + from fsspec.registry import known_implementations, available_protocols, register_implementation + + protocol = "dummyfs" + previous_registration_existed = False + + # setup + if protocol in known_implementations: + backup = known_implementations.pop(protocol) + previous_registration_existed = True + + assert ( + protocol not in known_implementations + ), f"As a test precondition, {protocol} should not be registered." + + # do and test + register_implementation_in_fsspec(protocol) + assert protocol in available_protocols(), f"{protocol} should be registered." + + # teardown + if previous_registration_existed: + register_implementation(protocol, backup, clobber=True) + assert ( + protocol in available_protocols() + ), f"After teardown, {protocol} should not be registered, which was the original state." + else: + known_implementations.pop(protocol) + assert ( + protocol not in known_implementations + ), f"After teardown, {protocol} should not be registered, which was the original state." + + +def test_register_unsupported_raises() -> None: + """Test registering an unsupported filesystem implementation with fsspec raises an error.""" + with pytest.raises(ValueError, match=r"Unknown protocol: 'unsupportedfs_t7Y8'"): + register_implementation_in_fsspec("unsupportedfs_t7Y8") + + def test_filesystem_instance(with_gdrive_buckets_env: str) -> None: @retry(stop=stop_after_attempt(10), wait=wait_fixed(1)) def check_file_exists():