Skip to content

Commit

Permalink
add dynamic registration of fsspec implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
deanja committed Feb 15, 2024
1 parent b04d563 commit 43c6180
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 27 deletions.
55 changes: 49 additions & 6 deletions dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
)
from urllib.parse import urlparse

from fsspec import AbstractFileSystem, register_implementation
from fsspec.registry import known_implementations, register_implementation
from fsspec import AbstractFileSystem
from fsspec.core import url_to_fs

from dlt import version
Expand Down Expand Up @@ -76,6 +77,52 @@ class FileItem(TypedDict, total=False):
"azure": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(),
}

CUSTOM_IMPLEMENTATIONS = {
"dummyfs": {
"fq_classname": "dummyfs.DummyFileSystem",
"errtxt": "Dummy only",
},
"gdrive": {
"fq_classname": "dlt.common.storages.fsspecs.google_drive.GoogleDriveFileSystem",
"errtxt": "Please install gdrivefs to access GoogleDriveFileSystem",
},
"gitpythonfs": {
"fq_classname": "dlt.common.storages.implementations.gitpythonfs.GitPythonFileSystem",
"errtxt": "Please install gitpythonfs to access GitPythonFileSystem",
},
}


def register_implementation_in_fsspec(protocol: str) -> None:
"""Dynamically register a filesystem implementation with fsspec.
This is useful if the implementation is not officially known in the fsspec codebase.
The registration's scope is the current process.
Is a no-op if an implementation is already registerd for the given protocol.
Args:
protocol (str): The protocol to register.
Returns: None
"""
if protocol in known_implementations:
return

if not protocol in CUSTOM_IMPLEMENTATIONS:
raise ValueError(
f"Unknown protocol: '{protocol}' is not an fsspec known "
"implementations nor a dlt custom implementations."
)

registration_details = CUSTOM_IMPLEMENTATIONS[protocol]
register_implementation(
protocol,
registration_details["fq_classname"],
errtxt=registration_details["errtxt"],
)


def fsspec_filesystem(
protocol: str,
Expand Down Expand Up @@ -112,11 +159,6 @@ def prepare_fsspec_args(config: FilesystemConfiguration) -> DictStrAny:
fs_kwargs: DictStrAny = {"use_listings_cache": False, "listings_expiry_time": 60.0}
credentials = CREDENTIALS_DISPATCH.get(protocol, lambda _: {})(config)

if protocol == "gdrive":
from dlt.common.storages.fsspecs.google_drive import GoogleDriveFileSystem

register_implementation("gdrive", GoogleDriveFileSystem, "GoogleDriveFileSystem")

if config.kwargs is not None:
fs_kwargs.update(config.kwargs)
if config.client_kwargs is not None:
Expand All @@ -142,6 +184,7 @@ def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSys
Returns: (fsspec filesystem, normalized url)
"""
fs_kwargs = prepare_fsspec_args(config)
register_implementation_in_fsspec(config.protocol)

try:
return url_to_fs(config.bucket_url, **fs_kwargs) # type: ignore
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/storages/implementations/gitpythonfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def register_implementation_in_fsspec() -> None:
This is needed if the implementation is not officially registered in the fsspec codebase.
It will also override ("clobber") an existing implementation having the same protocol.
The registration is only valid for the current process.
The registration's scope is the current process.
"""
register_implementation(
"gitpythonfs",
Expand Down
2 changes: 1 addition & 1 deletion tests/common/storages/implementations/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
def pytest_configure(config):
config.addinivalue_line(
"markers", "skip_fsspec_registration: marks test to not use fsspec registration fixture"
"markers", "no_registration_fixture: marks test to not use fsspec registration fixture"
)
50 changes: 32 additions & 18 deletions tests/common/storages/implementations/test_gitpythonfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@

import fsspec
from fsspec.registry import (
get_filesystem_class,
known_implementations,
available_protocols,
filesystem,
register_implementation,
)

from git import Repo, BadName
Expand All @@ -27,19 +27,21 @@
test_fs_kwargs = {"skip_instance_cache": True}


@pytest.fixture(scope="module", autouse=True)
@pytest.fixture(autouse=True)
def ensure_fs_registered(request) -> Iterator[None]:
"""Ensure that the gitpythonfs implementation is registered in fsspec"""
if "skip_fsspec_registration" in request.keywords:
pytest.skip("Skipping fsspec registration for marked tests")
try:
if PROTOCOL in known_implementations:
known_implementations.pop(PROTOCOL)
register_implementation_in_fsspec()
if "no_registration_fixture" in request.keywords:
# skip registration for tests marked @pytest.mark.no_registration_fixture
yield None
finally:
if PROTOCOL in known_implementations:
known_implementations.pop(PROTOCOL)
else:
try:
if PROTOCOL in known_implementations:
known_implementations.pop(PROTOCOL)
register_implementation_in_fsspec()
yield None
finally:
if PROTOCOL in known_implementations:
known_implementations.pop(PROTOCOL)


@pytest.fixture()
Expand Down Expand Up @@ -89,23 +91,35 @@ def repo_fixture() -> Iterator[Any]:
shutil.rmtree(repo_path)


@pytest.mark.skip_fsspec_registration
@pytest.mark.no_registration_fixture
def test_register_implementation_in_fsspec() -> None:
"""Test registering a filesystem with fsspec."""
"""Test registering a filesystem implementation with fsspec.
Takes care with state since other tests may be expecting certain
implementations to be registered.
"""
previous_registration_existed = False

# setup
if PROTOCOL in known_implementations:
known_implementations.pop(PROTOCOL)
backup = known_implementations.pop(PROTOCOL)
previous_registration_existed = True

assert (
PROTOCOL not in known_implementations
), f"As a test precondition, {PROTOCOL} should not be registered."

# do and test
register_implementation_in_fsspec()
assert PROTOCOL in available_protocols(), f"{PROTOCOL} should be registered."

cls = get_filesystem_class(PROTOCOL)
assert cls == GitPythonFileSystem

if PROTOCOL in known_implementations:
# teardown
if previous_registration_existed:
register_implementation(PROTOCOL, backup, clobber=True)
assert (
PROTOCOL in available_protocols()
), f"After teardown, {PROTOCOL} should not be registered, which was the original state."
else:
known_implementations.pop(PROTOCOL)
assert (
PROTOCOL not in known_implementations
Expand Down
46 changes: 45 additions & 1 deletion tests/load/filesystem/test_filesystem_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from dlt.common.configuration.inject import with_config
from dlt.common.configuration.specs import AzureCredentials, AzureCredentialsWithoutDefaults
from dlt.common.storages import fsspec_from_config, FilesystemConfiguration
from dlt.common.storages.fsspec_filesystem import MTIME_DISPATCH, glob_files
from dlt.common.storages.fsspec_filesystem import (
register_implementation_in_fsspec,
MTIME_DISPATCH,
glob_files,
)
from dlt.common.utils import uniq_id
from tests.common.storages.utils import assert_sample_files
from tests.load.utils import ALL_FILESYSTEM_DRIVERS, AWS_BUCKET
Expand Down Expand Up @@ -42,6 +46,46 @@ def test_filesystem_configuration() -> None:
}


def test_register_implementation_in_fsspec() -> None:
"""Test registering a filesystem implementation with fsspec."""
# ToDo make test more focused and safe with mock. Just want a unit test.
from fsspec.registry import known_implementations, available_protocols, register_implementation

protocol = "dummyfs"
previous_registration_existed = False

# setup
if protocol in known_implementations:
backup = known_implementations.pop(protocol)
previous_registration_existed = True

assert (
protocol not in known_implementations
), f"As a test precondition, {protocol} should not be registered."

# do and test
register_implementation_in_fsspec(protocol)
assert protocol in available_protocols(), f"{protocol} should be registered."

# teardown
if previous_registration_existed:
register_implementation(protocol, backup, clobber=True)
assert (
protocol in available_protocols()
), f"After teardown, {protocol} should not be registered, which was the original state."
else:
known_implementations.pop(protocol)
assert (
protocol not in known_implementations
), f"After teardown, {protocol} should not be registered, which was the original state."


def test_register_unsupported_raises() -> None:
"""Test registering an unsupported filesystem implementation with fsspec raises an error."""
with pytest.raises(ValueError, match=r"Unknown protocol: 'unsupportedfs_t7Y8'"):
register_implementation_in_fsspec("unsupportedfs_t7Y8")


def test_filesystem_instance(with_gdrive_buckets_env: str) -> None:
@retry(stop=stop_after_attempt(10), wait=wait_fixed(1))
def check_file_exists():
Expand Down

0 comments on commit 43c6180

Please sign in to comment.