From b04d563633ff36e5f5b2a1b87f637052ab7190e5 Mon Sep 17 00:00:00 2001 From: Dean Jackson <57651082+deanja@users.noreply.github.com> Date: Mon, 29 Jan 2024 17:19:04 +1100 Subject: [PATCH 1/8] Add gitpythonfs fsspec implementation - Intended to be used in filesystem verified Source. -read-only - due to depedency of gitpython on git binaries, gitpythonfs should only be conditionally imported, for example by registering the module using `register_implementation_in_fsspec()` --- .../storages/implementations/__init__.py | 0 .../storages/implementations/gitpythonfs.py | 337 ++++++++++++++++++ .../storages/implementations/__init__.py | 0 .../storages/implementations/conftest.py | 4 + .../implementations/test_gitpythonfs.py | 314 ++++++++++++++++ 5 files changed, 655 insertions(+) create mode 100644 dlt/common/storages/implementations/__init__.py create mode 100644 dlt/common/storages/implementations/gitpythonfs.py create mode 100644 tests/common/storages/implementations/__init__.py create mode 100644 tests/common/storages/implementations/conftest.py create mode 100644 tests/common/storages/implementations/test_gitpythonfs.py diff --git a/dlt/common/storages/implementations/__init__.py b/dlt/common/storages/implementations/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dlt/common/storages/implementations/gitpythonfs.py b/dlt/common/storages/implementations/gitpythonfs.py new file mode 100644 index 0000000000..2ea85e03da --- /dev/null +++ b/dlt/common/storages/implementations/gitpythonfs.py @@ -0,0 +1,337 @@ +from typing import List, Dict, Any, Optional, Union +from os import PathLike +from git import Repo +from git.objects import Blob, Tree, Object +from fsspec.registry import register_implementation +from fsspec.spec import AbstractFileSystem +from fsspec.implementations.memory import MemoryFile +from functools import lru_cache + + +def register_implementation_in_fsspec() -> None: + """Dyanmically register the filesystem with fsspec. + + This is needed if the implementation is not officially registered in the fsspec codebase. + It will also override ("clobber") an existing implementation having the same protocol. + The registration is only valid for the current process. + """ + register_implementation( + "gitpythonfs", + "dlt.common.storages.implementations.gitpythonfs.GitPythonFileSystem", + clobber=True, + errtxt="Please install gitpythonfs to access GitPythonFileSystem", + ) + + +# Thanks to other implemetnation authors: https://github.com/fsspec/filesystem_spec/tree/master/fsspec/implementations +class GitPythonFileSystem(AbstractFileSystem): + """A filesystem for git repositories on the local filesystem. + + An instance of this class provides the files residing within a local git + repository. You may specify a point in the repo's history, by SHA, branch + or tag (default is HEAD). + + You can retrieve information such as a file's modified time, which would not + be possible if looking at the local filesystem directly. + + It is based on the gitpython library, which could also be used to clone or update + files from a remote repo before reading them with this filesystem. + + Instances of this class cache some git objects so it not recommended to change + the git repo within the lifetime of an instance. Calling clear_git_caches() may help. + Also note that fsspec itself caches entire instances, which can be overridden with + the `skip_instance_cache=True` keyword argument. + """ + + protocol = "gitpythonfs" + READ_ONLY_MESSAGE = "This fsspec implementation is read-only." + + def __init__( + self, repo_path: Optional[str] = None, ref: Optional[str] = None, **kwargs: Any + ) -> None: + """ + Initialize a GitPythonFS object. + + In fucntions such as open_files() and url_to_fs(), arguments can + be passed in the url instead of as keyword parameters: + `gitpythonfs://path/to/file` + (though it doesn't escape [:@] or any other special characters). + Examples: + `gitpythonfs:///some_folder/my_repo:path/to/intro.md` + `gitpythonfs:///some_folder/my_repo:mybranch@path/to/intro.md` + + Args: + repo_path (str): Local location of the Git repo. Defaults to current directory. + ref (str): A branch, tag or commit hash to use. + Defaults to HEAD of the local repo. + """ + super().__init__(**kwargs) + + self.repo_path = repo_path + self.repo = Repo(self.repo_path) + + # error early if bad ref. ToDo: check this whenever ref passed to any method + if ref: + self.repo.commit(ref) + + self.ref = ref or self.repo.head.ref.name + + self._get_tree = lru_cache(maxsize=128)(self._get_tree_uncached) + self._get_revision_details = lru_cache(maxsize=16)(self._get_revision_details_uncached) + + def _get_tree_uncached(self, ref: str) -> Tree: + """Get the tree at repo root for a given ref + + Args: + ref (str): The reference to the commit, branch or tag + + Returns: + git.Tree: The tree object at the root of the repository for the given ref + """ + return self.repo.tree(ref) + + def _get_revision_details_uncached(self, ref: str) -> Dict[str, int]: + """Get the revisions at a given ref for entire repo including subdirectories. + + Args: + ref (str): The reference to the commit, branch or tag + + Returns: + Dict[str, int]: A dictionary mapping file path to file's last modified time + """ + result: Dict[str, int] = parse_git_revlist( + get_revisions_all_raw(self.repo, ref or self.ref) + ) + return result + + def clear_git_caches(self) -> None: + """Clear the git caches. + + This is useful if the repo has changed and you want to force a refresh of the + cached objects. Also not that fsspec itself may cache instances of AbstractFileSystem. + """ + self._get_tree.cache_clear() + self._get_revision_details.cache_clear() + + @classmethod + def _strip_protocol(cls, path: str) -> str: + path = super()._strip_protocol(path).lstrip("/") + if ":" in path: + path = path.split(":", 1)[1] + if "@" in path: + path = path.split("@", 1)[1] + return path.lstrip("/") + + @staticmethod + def _get_kwargs_from_urls(path: str) -> Dict[str, str]: + if path.startswith("gitpythonfs://"): + path = path[14:] + out = {} + if ":" in path: + out["repo_path"], path = path.split(":", 1) + if "@" in path: + out["ref"], path = path.split("@", 1) + return out + + def _git_type_to_file_type(self, git_object: Object) -> str: + if isinstance(git_object, Blob): + return "file" + elif isinstance(git_object, Tree): + return "directory" + else: + msg = ( + "There is no fileystem object type corresponding to Git object type:" + f" {type(git_object).__name__}" + ) + raise TypeError(msg) + + def _details( + self, + git_object: Object, + ref: Optional[str] = None, + include_committed_date: bool = True, + ) -> Dict[str, Union[str, int]]: + """ + Retrieves the details of a Git object. + + Args: + object (git.Object): The Git object to retrieve details for. + include_committed_date (bool, optional): Whether to include the committed date. Defaults to True. + Getting the committed date is an expensive operation and will slow down + walk(), a method that is extensively used by fsspec for find(), glob() etc. + + Returns: + dict: A dictionary containing the details typical for fsspec. + """ + object_type: str = self._git_type_to_file_type(git_object) + + details = { + "name": git_object.path, + "type": self._git_type_to_file_type(git_object), + "mode": f"{git_object.mode:o}", + "mime_type": git_object.mime_type if object_type == "file" else None, + "size": git_object.size, + "hex": git_object.hexsha, + } + + # Only get expensive details if needed. + if object_type == "file" and include_committed_date: + details["mtime"] = self._get_revision_details(ref or self.ref)[git_object.path] + + return details + + def ls( + self, + path: str, + detail: bool = False, + ref: Optional[str] = None, + **kwargs: Any, + ) -> List[Any]: + """List files at given path in the repo.""" + path = self._strip_protocol(path) + results: List[Any] = [] + + # GitPython recommends always starting at root of repo. + tree = self._get_tree(ref or self.ref) + + object_at_path = tree if path == "" else tree / path + if isinstance(object_at_path, Tree): + if detail: + for git_object in object_at_path: + results.append(self._details(git_object, ref or self.ref, **kwargs)) + return results + else: + for git_object in object_at_path: + results.append(git_object.path) + return results + else: + # path is to a single blob. + if detail: + results.append(self._details(object_at_path, ref or self.ref, **kwargs)) + return results + else: + results.append(object_at_path.path) + return results + + def _open( + self, + path: str, + mode: str = "rb", + block_size: Optional[int] = None, + autocommit: bool = True, + cache_options: Optional[Any] = None, + ref: Optional[str] = None, + **kwargs: Any, + ) -> MemoryFile: + path = self._strip_protocol(path) + tree = self._get_tree(ref or self.ref) + blob = tree / path + return MemoryFile(data=blob.data_stream.read()) + + def mv(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError(self.READ_ONLY_MESSAGE) + + def rm(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError(self.READ_ONLY_MESSAGE) + + def touch(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError(self.READ_ONLY_MESSAGE) + + def mkdir(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError(self.READ_ONLY_MESSAGE) + + def mkdirs(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError(self.READ_ONLY_MESSAGE) + + def rmdir(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError(self.READ_ONLY_MESSAGE) + + def put_file(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError(self.READ_ONLY_MESSAGE) + + def put(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError(self.READ_ONLY_MESSAGE) + + def cp_file(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError(self.READ_ONLY_MESSAGE) + + def copy(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError(self.READ_ONLY_MESSAGE) + + def rm_file(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError(self.READ_ONLY_MESSAGE) + + def _rm(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError(self.READ_ONLY_MESSAGE) + + def chmod(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError(self.READ_ONLY_MESSAGE) + + def chown(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError(self.READ_ONLY_MESSAGE) + + +def get_revisions_all_raw(repo: Repo, ref: str) -> str: + """ + Get the git revisions at a given ref for entire repo including subdirectories. + + Is much faster than iterating commits in GitPython. + + Args: + repo (git.Repo): The git repository object. + ref (str): The reference (commit, branch, tag, etc.) to get the revisions from. + + Returns: + str: The revisions at the given ref, as would be on stdout". + """ + + git_cmd_runner = repo.git + + # git uses fnmatch(3) style matching + path_spec = ":(top)" + + out: str = git_cmd_runner.log(ref, path_spec, raw=True, no_merges=True, pretty="%at") + return out + + +def parse_git_revlist(git_cmd_output: str) -> Dict[str, int]: + """ + Parse raw text output produced by git rev-list + + `git log` and `git whatchanged` have similar output. + + Args: + raw_text (str): The git cmd output to parse. + + Returns: + dict: The parsed revisions info as a dictionary with: + key: path to file + value: most recent commit timestamp for file + """ + revisions_info = {} + + lines = git_cmd_output.splitlines() + for line in lines: + line = line.strip() + if not line: + continue + + if not line.startswith(":"): + # it's the line with the pretty format info + committed_at = int(line) + else: + filepath = line.split("\t")[-1] + # git outputs revisions newest first. So we ignore a files we've + # already seen it because we only want metadata for the latest + # commit for each file. + if filepath not in revisions_info: + revisions_info[filepath] = committed_at + + return revisions_info + + +def get_revisions_all(repo: Repo, ref: str) -> Dict[str, int]: + """Example of getting revisions raw output and parsing it.""" + + raw = get_revisions_all_raw(repo, ref) + return parse_git_revlist(raw) diff --git a/tests/common/storages/implementations/__init__.py b/tests/common/storages/implementations/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/common/storages/implementations/conftest.py b/tests/common/storages/implementations/conftest.py new file mode 100644 index 0000000000..b99135d8b5 --- /dev/null +++ b/tests/common/storages/implementations/conftest.py @@ -0,0 +1,4 @@ +def pytest_configure(config): + config.addinivalue_line( + "markers", "skip_fsspec_registration: marks test to not use fsspec registration fixture" + ) diff --git a/tests/common/storages/implementations/test_gitpythonfs.py b/tests/common/storages/implementations/test_gitpythonfs.py new file mode 100644 index 0000000000..68e6414235 --- /dev/null +++ b/tests/common/storages/implementations/test_gitpythonfs.py @@ -0,0 +1,314 @@ +import os +import subprocess +import tempfile +import shutil + +import pytest +from typing import Iterator, Any + +import fsspec +from fsspec.registry import ( + get_filesystem_class, + known_implementations, + available_protocols, + filesystem, +) + +from git import Repo, BadName + +from dlt.common.storages.implementations.gitpythonfs import ( + GitPythonFileSystem, + register_implementation_in_fsspec, + get_revisions_all_raw, + parse_git_revlist, +) + +PROTOCOL = GitPythonFileSystem.protocol +test_fs_kwargs = {"skip_instance_cache": True} + + +@pytest.fixture(scope="module", autouse=True) +def ensure_fs_registered(request) -> Iterator[None]: + """Ensure that the gitpythonfs implementation is registered in fsspec""" + if "skip_fsspec_registration" in request.keywords: + pytest.skip("Skipping fsspec registration for marked tests") + try: + if PROTOCOL in known_implementations: + known_implementations.pop(PROTOCOL) + register_implementation_in_fsspec() + yield None + finally: + if PROTOCOL in known_implementations: + known_implementations.pop(PROTOCOL) + + +@pytest.fixture() +def repo_fixture() -> Iterator[Any]: + """Create a temporary git repository. + + Thanks to https://github.com/fsspec/filesystem_spec/blob/master/fsspec/implementations/tests/test_git.py + """ + original_working_dir = os.getcwd() + repo_path = tempfile.mkdtemp() + try: + os.chdir(repo_path) + subprocess.call("git init", shell=True, cwd=repo_path) + subprocess.call("git checkout -b master", shell=True, cwd=repo_path) + subprocess.call("git config user.email 'you@example.com'", shell=True, cwd=repo_path) + subprocess.call("git config user.name 'Your Name'", shell=True, cwd=repo_path) + + open(os.path.join(repo_path, "file1"), "w", encoding="utf-8").write("four5") + subprocess.call("git add file1", shell=True, cwd=repo_path) + subprocess.call("git commit -m 'init'", shell=True, cwd=repo_path) + sha_first = ( + subprocess.check_output("git rev-parse HEAD", shell=True, cwd=repo_path) + .strip() + .decode("utf-8") + ) + + open(os.path.join(repo_path, "file1"), "w", encoding="utf-8").write("four56") + subprocess.call("git commit -a -m 'tagger'", shell=True, cwd=repo_path) + subprocess.call("git tag -a thetag -m 'make tag'", shell=True, cwd=repo_path) + + open(os.path.join(repo_path, "file2"), "w", encoding="utf-8").write("four567") + subprocess.call("git add file2", shell=True) + subprocess.call("git commit -m 'master tip'", shell=True, cwd=repo_path) + + subprocess.call("git checkout -b abranch", shell=True, cwd=repo_path) + os.mkdir("inner") + open(os.path.join(repo_path, "inner", "file3"), "w", encoding="utf-8").write("four5") + subprocess.call("git add inner/file3", shell=True, cwd=repo_path) + open(os.path.join(repo_path, "inner", "file4"), "w", encoding="utf-8").write("four5") + subprocess.call("git add inner/file4", shell=True, cwd=repo_path) + subprocess.call("git commit -m 'abranch tip'", shell=True, cwd=repo_path) + + os.chdir(original_working_dir) + yield repo_path, sha_first + finally: + os.chdir(original_working_dir) + shutil.rmtree(repo_path) + + +@pytest.mark.skip_fsspec_registration +def test_register_implementation_in_fsspec() -> None: + """Test registering a filesystem with fsspec.""" + if PROTOCOL in known_implementations: + known_implementations.pop(PROTOCOL) + + assert ( + PROTOCOL not in known_implementations + ), f"As a test precondition, {PROTOCOL} should not be registered." + + register_implementation_in_fsspec() + assert PROTOCOL in available_protocols(), f"{PROTOCOL} should be registered." + + cls = get_filesystem_class(PROTOCOL) + assert cls == GitPythonFileSystem + + if PROTOCOL in known_implementations: + known_implementations.pop(PROTOCOL) + assert ( + PROTOCOL not in known_implementations + ), f"After teardown, {PROTOCOL} should not be registered, which was the original state." + + +def test_instantiate_fsspec_filesystem(repo_fixture: Iterator[Any]) -> None: + """Test instantiating a filesystem with fsspec.""" + d, _ = repo_fixture + + fs = filesystem(PROTOCOL, repo_path=d, **test_fs_kwargs) + assert type(fs) == GitPythonFileSystem + + +def test_ls_entries(repo_fixture: Iterator[Any]) -> None: + """Test listing folders and files in a repository.""" + d, _ = repo_fixture + fs = filesystem(PROTOCOL, repo_path=d, **test_fs_kwargs) + + assert fs.ls("") == [ + "file1", + "file2", + "inner", + ], "Should return all objects at root of repo." + assert fs.ls("file1") == ["file1"], "Should return a single file at root." + assert fs.ls("inner") == [ + "inner/file3", + "inner/file4", + ], "Should return 2 files, with their paths." + assert fs.ls("inner/file3") == ["inner/file3"], "Should return a single file in folder." + + +def test_ls_file_details(repo_fixture: Iterator[Any]) -> None: + """Test showing details for a file (git.Blob) in a repository.""" + + # setup + d, _ = repo_fixture + fs = filesystem(PROTOCOL, repo_path=d, **test_fs_kwargs) + + # do + files = fs.ls("file1", detail=True, include_committed_date=True) + assert len(files) == 1, "Should return a single object." + details = files[0] + + # assert + assert details["name"] == "file1" + assert details["type"] == "file" + assert details["mime_type"] == "text/plain" + assert isinstance(details["size"], int) + assert isinstance(details["hex"], str) + assert isinstance( + details["mode"], str + ), "Should be a string representation of octal, without the 0o prefix." + assert isinstance(details["committed_date"], int) + + +def test_git_refs(repo_fixture: Iterator[Any]) -> None: + """Test results for git refs - eg commit sha, branch, tag.""" + d, _ = repo_fixture + + with fsspec.open("gitpythonfs://inner/file3", repo_path=d, **test_fs_kwargs) as f: + assert f.read() == b"four5", "Should read from HEAD if no ref given." + + try: + with fsspec.open( + "gitpythonfs://inner/file3", repo_path=d, ref="HEAD", **test_fs_kwargs + ) as f: + f.read() + except BadName: + pytest.fail("Should accept HEAD as a ref.") + + with pytest.raises(BadName): + with fsspec.open( + "gitpythonfs://file1", + repo_path=d, + ref="somenonexisentgitref", + **test_fs_kwargs, + ) as f: + _ = f.read() + + +def test_git_refs_on_open(repo_fixture: Iterator[Any]) -> None: + d, sha_first = repo_fixture + + with fsspec.open("gitpythonfs://file1", repo_path=d, ref=sha_first, **test_fs_kwargs) as f: + assert f.read() == b"four5", "Should read file version at given sha (aka commit id)." + + with fsspec.open("gitpythonfs://file1", repo_path=d, ref="thetag", **test_fs_kwargs) as f: + assert f.read() == b"four56", "Should read file version at given tag." + + +def test_git_refs_on_ls(repo_fixture: Iterator[Any]) -> None: + d, sha_first = repo_fixture + + fs = filesystem(PROTOCOL, repo_path=d, ref=sha_first, **test_fs_kwargs) + files = fs.ls("file1", detail=True) + assert len(files) == 1, "Should return a single object." + assert files[0]["size"] == 5, "Should return file size as at the sha given in constructor." + + fs = filesystem(PROTOCOL, repo_path=d, **test_fs_kwargs) + files = fs.ls("file1", ref=sha_first, detail=True) + assert len(files) == 1, "Should return a single object." + assert files[0]["size"] == 5, "Should return file size as at sha given in ls()." + + fs = filesystem(PROTOCOL, repo_path=d, ref="HEAD", **test_fs_kwargs) + files = fs.ls("file1", ref=sha_first, detail=True) + assert len(files) == 1, "Should return a single object." + assert files[0]["size"] == 5, "ls() ref should override constructor ref." + + +def test_get_kwargs_from_urls() -> None: + """Test getting kwargs from url.""" + repo_path = "/some/path/to/repo" + ref = "some_tag" + + url = f"gitpythonfs://{repo_path}:{ref}@file1" + kwargs = GitPythonFileSystem._get_kwargs_from_urls(url) + assert kwargs["repo_path"] == repo_path + assert kwargs["ref"] == ref + + +def test_url(repo_fixture: Iterator[Any]) -> None: + """Test reading a file from a repository via url. + + For supported url formats see GitPytonFileSystem class doco""" + + d, sha_first = repo_fixture + + with fsspec.open("gitpythonfs://file1", repo_path=d, **test_fs_kwargs) as f: + assert f.read() == b"four56", "Should return file at root." + + with fsspec.open(f"gitpythonfs://{d}:file1", **test_fs_kwargs) as f: + assert f.read() == b"four56", "Should return file via the repo path embedded in the url." + + with fsspec.open(f"gitpythonfs://{d}:{sha_first}@file1", **test_fs_kwargs) as f: + assert ( + f.read() == b"four5" + ), "Should return file via the repo path and git ref embedded in the url." + + +def test_multiple_files(repo_fixture: Iterator[Any]) -> None: + """Test reading multiple files from a repository.""" + d, _ = repo_fixture + + files = fsspec.open_files(f"gitpythonfs://{d}:**file*", **test_fs_kwargs) + assert len(files) == 4, "Glob should recurse folders and return 4 files that start with `file`." + + +def test_non_readonly_raises_exception(repo_fixture: Iterator[Any]) -> None: + """Test that non-readonly operations raise an exception.""" + d, _ = repo_fixture + + with pytest.raises(NotImplementedError): + GitPythonFileSystem(d, **test_fs_kwargs).mv() # type: ignore + + +def test_get_revisions_all_raw(repo_fixture: Iterator[Any]) -> None: + """Test getting all revisions.""" + d, _ = repo_fixture + repo = Repo(d) + + result = get_revisions_all_raw(repo, "HEAD") + + assert not result == "", "Should return some info" + assert all( + filename in result for filename in ["file1", "file2", "inner/file3", "inner/file4"] + ), "Should return info for all files in repo" + + +def test_get_revisions_all_raw_at_ref(repo_fixture: Iterator[Any]) -> None: + """Test getting all revisions at ref.""" + d, sha_first = repo_fixture + repo = Repo(d) + + result = get_revisions_all_raw(repo, sha_first) + + assert "file1" in result, "Should return info for one file that exists at ref" + assert "file2" not in result, "Should not return info for file not existent at ref" + + +def test_parse_git_revlist() -> None: + """Test parsing git revlist output.""" + + git_cmd_output = ( + # for clarity, broken into neighbouring string constants, which Python + # automatically concatenates again to be like the original. + "1703550238\n" + "\n" + ":000000 100644 0000000 cdca2c1 A\tinner/file3\n" + ":000000 100644 0000000 488d13d A\tinner/file4\n" + "1703550237\n\n:000000 100644 0000000 aff88ce A\tfile2\n" + "1703550236\n" + "\n" + ":100644 100644 1c37a15 a906852 M\tfile1\n" + "1703550235\n" + "\n" + ":000000 100644 0000000 1c37a15 A\tfile1" + ) + + result = parse_git_revlist(git_cmd_output) + assert result == { + "inner/file3": 1703550238, + "inner/file4": 1703550238, + "file2": 1703550237, + "file1": 1703550236, + }, "Should return dict with latest timetamp for each file" From 43c61807c6f7520ab127c029e13c70c805bc61cc Mon Sep 17 00:00:00 2001 From: Dean Jackson <57651082+deanja@users.noreply.github.com> Date: Thu, 1 Feb 2024 16:38:47 +1100 Subject: [PATCH 2/8] add dynamic registration of fsspec implementations --- dlt/common/storages/fsspec_filesystem.py | 55 +++++++++++++++++-- .../storages/implementations/gitpythonfs.py | 2 +- .../storages/implementations/conftest.py | 2 +- .../implementations/test_gitpythonfs.py | 50 +++++++++++------ .../load/filesystem/test_filesystem_common.py | 46 +++++++++++++++- 5 files changed, 128 insertions(+), 27 deletions(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index b1cbc11bf9..5e003c66f6 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -20,7 +20,8 @@ ) from urllib.parse import urlparse -from fsspec import AbstractFileSystem, register_implementation +from fsspec.registry import known_implementations, register_implementation +from fsspec import AbstractFileSystem from fsspec.core import url_to_fs from dlt import version @@ -76,6 +77,52 @@ class FileItem(TypedDict, total=False): "azure": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(), } +CUSTOM_IMPLEMENTATIONS = { + "dummyfs": { + "fq_classname": "dummyfs.DummyFileSystem", + "errtxt": "Dummy only", + }, + "gdrive": { + "fq_classname": "dlt.common.storages.fsspecs.google_drive.GoogleDriveFileSystem", + "errtxt": "Please install gdrivefs to access GoogleDriveFileSystem", + }, + "gitpythonfs": { + "fq_classname": "dlt.common.storages.implementations.gitpythonfs.GitPythonFileSystem", + "errtxt": "Please install gitpythonfs to access GitPythonFileSystem", + }, +} + + +def register_implementation_in_fsspec(protocol: str) -> None: + """Dynamically register a filesystem implementation with fsspec. + + This is useful if the implementation is not officially known in the fsspec codebase. + + The registration's scope is the current process. + + Is a no-op if an implementation is already registerd for the given protocol. + + Args: + protocol (str): The protocol to register. + + Returns: None + """ + if protocol in known_implementations: + return + + if not protocol in CUSTOM_IMPLEMENTATIONS: + raise ValueError( + f"Unknown protocol: '{protocol}' is not an fsspec known " + "implementations nor a dlt custom implementations." + ) + + registration_details = CUSTOM_IMPLEMENTATIONS[protocol] + register_implementation( + protocol, + registration_details["fq_classname"], + errtxt=registration_details["errtxt"], + ) + def fsspec_filesystem( protocol: str, @@ -112,11 +159,6 @@ def prepare_fsspec_args(config: FilesystemConfiguration) -> DictStrAny: fs_kwargs: DictStrAny = {"use_listings_cache": False, "listings_expiry_time": 60.0} credentials = CREDENTIALS_DISPATCH.get(protocol, lambda _: {})(config) - if protocol == "gdrive": - from dlt.common.storages.fsspecs.google_drive import GoogleDriveFileSystem - - register_implementation("gdrive", GoogleDriveFileSystem, "GoogleDriveFileSystem") - if config.kwargs is not None: fs_kwargs.update(config.kwargs) if config.client_kwargs is not None: @@ -142,6 +184,7 @@ def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSys Returns: (fsspec filesystem, normalized url) """ fs_kwargs = prepare_fsspec_args(config) + register_implementation_in_fsspec(config.protocol) try: return url_to_fs(config.bucket_url, **fs_kwargs) # type: ignore diff --git a/dlt/common/storages/implementations/gitpythonfs.py b/dlt/common/storages/implementations/gitpythonfs.py index 2ea85e03da..fceb9af3b0 100644 --- a/dlt/common/storages/implementations/gitpythonfs.py +++ b/dlt/common/storages/implementations/gitpythonfs.py @@ -13,7 +13,7 @@ def register_implementation_in_fsspec() -> None: This is needed if the implementation is not officially registered in the fsspec codebase. It will also override ("clobber") an existing implementation having the same protocol. - The registration is only valid for the current process. + The registration's scope is the current process. """ register_implementation( "gitpythonfs", diff --git a/tests/common/storages/implementations/conftest.py b/tests/common/storages/implementations/conftest.py index b99135d8b5..df2cecce47 100644 --- a/tests/common/storages/implementations/conftest.py +++ b/tests/common/storages/implementations/conftest.py @@ -1,4 +1,4 @@ def pytest_configure(config): config.addinivalue_line( - "markers", "skip_fsspec_registration: marks test to not use fsspec registration fixture" + "markers", "no_registration_fixture: marks test to not use fsspec registration fixture" ) diff --git a/tests/common/storages/implementations/test_gitpythonfs.py b/tests/common/storages/implementations/test_gitpythonfs.py index 68e6414235..a1f05a07ef 100644 --- a/tests/common/storages/implementations/test_gitpythonfs.py +++ b/tests/common/storages/implementations/test_gitpythonfs.py @@ -8,10 +8,10 @@ import fsspec from fsspec.registry import ( - get_filesystem_class, known_implementations, available_protocols, filesystem, + register_implementation, ) from git import Repo, BadName @@ -27,19 +27,21 @@ test_fs_kwargs = {"skip_instance_cache": True} -@pytest.fixture(scope="module", autouse=True) +@pytest.fixture(autouse=True) def ensure_fs_registered(request) -> Iterator[None]: """Ensure that the gitpythonfs implementation is registered in fsspec""" - if "skip_fsspec_registration" in request.keywords: - pytest.skip("Skipping fsspec registration for marked tests") - try: - if PROTOCOL in known_implementations: - known_implementations.pop(PROTOCOL) - register_implementation_in_fsspec() + if "no_registration_fixture" in request.keywords: + # skip registration for tests marked @pytest.mark.no_registration_fixture yield None - finally: - if PROTOCOL in known_implementations: - known_implementations.pop(PROTOCOL) + else: + try: + if PROTOCOL in known_implementations: + known_implementations.pop(PROTOCOL) + register_implementation_in_fsspec() + yield None + finally: + if PROTOCOL in known_implementations: + known_implementations.pop(PROTOCOL) @pytest.fixture() @@ -89,23 +91,35 @@ def repo_fixture() -> Iterator[Any]: shutil.rmtree(repo_path) -@pytest.mark.skip_fsspec_registration +@pytest.mark.no_registration_fixture def test_register_implementation_in_fsspec() -> None: - """Test registering a filesystem with fsspec.""" + """Test registering a filesystem implementation with fsspec. + + Takes care with state since other tests may be expecting certain + implementations to be registered. + """ + previous_registration_existed = False + + # setup if PROTOCOL in known_implementations: - known_implementations.pop(PROTOCOL) + backup = known_implementations.pop(PROTOCOL) + previous_registration_existed = True assert ( PROTOCOL not in known_implementations ), f"As a test precondition, {PROTOCOL} should not be registered." + # do and test register_implementation_in_fsspec() assert PROTOCOL in available_protocols(), f"{PROTOCOL} should be registered." - cls = get_filesystem_class(PROTOCOL) - assert cls == GitPythonFileSystem - - if PROTOCOL in known_implementations: + # teardown + if previous_registration_existed: + register_implementation(PROTOCOL, backup, clobber=True) + assert ( + PROTOCOL in available_protocols() + ), f"After teardown, {PROTOCOL} should not be registered, which was the original state." + else: known_implementations.pop(PROTOCOL) assert ( PROTOCOL not in known_implementations diff --git a/tests/load/filesystem/test_filesystem_common.py b/tests/load/filesystem/test_filesystem_common.py index 4d370fc786..1987d1274d 100644 --- a/tests/load/filesystem/test_filesystem_common.py +++ b/tests/load/filesystem/test_filesystem_common.py @@ -11,7 +11,11 @@ from dlt.common.configuration.inject import with_config from dlt.common.configuration.specs import AzureCredentials, AzureCredentialsWithoutDefaults from dlt.common.storages import fsspec_from_config, FilesystemConfiguration -from dlt.common.storages.fsspec_filesystem import MTIME_DISPATCH, glob_files +from dlt.common.storages.fsspec_filesystem import ( + register_implementation_in_fsspec, + MTIME_DISPATCH, + glob_files, +) from dlt.common.utils import uniq_id from tests.common.storages.utils import assert_sample_files from tests.load.utils import ALL_FILESYSTEM_DRIVERS, AWS_BUCKET @@ -42,6 +46,46 @@ def test_filesystem_configuration() -> None: } +def test_register_implementation_in_fsspec() -> None: + """Test registering a filesystem implementation with fsspec.""" + # ToDo make test more focused and safe with mock. Just want a unit test. + from fsspec.registry import known_implementations, available_protocols, register_implementation + + protocol = "dummyfs" + previous_registration_existed = False + + # setup + if protocol in known_implementations: + backup = known_implementations.pop(protocol) + previous_registration_existed = True + + assert ( + protocol not in known_implementations + ), f"As a test precondition, {protocol} should not be registered." + + # do and test + register_implementation_in_fsspec(protocol) + assert protocol in available_protocols(), f"{protocol} should be registered." + + # teardown + if previous_registration_existed: + register_implementation(protocol, backup, clobber=True) + assert ( + protocol in available_protocols() + ), f"After teardown, {protocol} should not be registered, which was the original state." + else: + known_implementations.pop(protocol) + assert ( + protocol not in known_implementations + ), f"After teardown, {protocol} should not be registered, which was the original state." + + +def test_register_unsupported_raises() -> None: + """Test registering an unsupported filesystem implementation with fsspec raises an error.""" + with pytest.raises(ValueError, match=r"Unknown protocol: 'unsupportedfs_t7Y8'"): + register_implementation_in_fsspec("unsupportedfs_t7Y8") + + def test_filesystem_instance(with_gdrive_buckets_env: str) -> None: @retry(stop=stop_after_attempt(10), wait=wait_fixed(1)) def check_file_exists(): From c16a9dccd4f5837b9890ddfbdb48ee69dcb530d2 Mon Sep 17 00:00:00 2001 From: Dean Jackson <57651082+deanja@users.noreply.github.com> Date: Fri, 2 Feb 2024 14:37:17 +1100 Subject: [PATCH 3/8] Urls do not require a netloc (in rfc1808) - See https://www.rfc-editor.org/rfc/rfc1808.html#section-2.1 . So, for example, gitpythonfs:// is valid and works in fsspec. - However in bucket-based systems, the bucket name is where the netloc would be - eg s3://bucket_name. But luckily s3fs (and probably az, gcs etc) already gives helpful error messages if no bucket name given, wrong bucket name given etc. - As a ToDo, this rule could test only those protocols whose fsspec implementation needs netloc, rather than excluding a few protocols as is done here. But we don't know which protocols this rule was initially added for. --- dlt/common/storages/configuration.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/dlt/common/storages/configuration.py b/dlt/common/storages/configuration.py index 2cbe7c78d5..5c63304bd0 100644 --- a/dlt/common/storages/configuration.py +++ b/dlt/common/storages/configuration.py @@ -115,11 +115,12 @@ def protocol(self) -> str: def on_resolved(self) -> None: url = urlparse(self.bucket_url) - if not url.path and not url.netloc: - raise ConfigurationValueError( - "File path or netloc missing. Field bucket_url of FilesystemClientConfiguration" - " must contain valid url with a path or host:password component." - ) + if url.scheme not in ("gitpythonfs", "github", "git"): + if not url.path and not url.netloc: + raise ConfigurationValueError( + "File path or netloc missing. Field bucket_url of FilesystemClientConfiguration" + " must contain valid url with a path or host:password component." + ) # this is just a path in a local file system if url.path == self.bucket_url: url = url._replace(scheme="file") From ea0557ff6ab40bfce3ca9f890dceba2b780199e7 Mon Sep 17 00:00:00 2001 From: Dean Jackson <57651082+deanja@users.noreply.github.com> Date: Tue, 6 Feb 2024 15:10:40 +1100 Subject: [PATCH 4/8] Use 'mtime' as default input for `modification_date` -reduces need to modify code every time a new fsspec implementation is added - `mtime` is idiomatic in *nix file systems. --- dlt/common/storages/fsspec_filesystem.py | 57 ++++++++++++++----- dlt/common/storages/transactional_file.py | 9 ++- .../implementations/test_gitpythonfs.py | 2 +- .../load/filesystem/test_filesystem_common.py | 4 +- 4 files changed, 49 insertions(+), 23 deletions(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 5e003c66f6..036a86c225 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -49,21 +49,20 @@ class FileItem(TypedDict, total=False): file_content: Optional[bytes] -# Map of protocol to mtime resolver -# we only need to support a small finite set of protocols -MTIME_DISPATCH = { - "s3": lambda f: ensure_pendulum_datetime(f["LastModified"]), - "adl": lambda f: ensure_pendulum_datetime(f["LastModified"]), - "az": lambda f: ensure_pendulum_datetime(f["last_modified"]), - "gcs": lambda f: ensure_pendulum_datetime(f["updated"]), - "file": lambda f: ensure_pendulum_datetime(f["mtime"]), - "memory": lambda f: ensure_pendulum_datetime(f["created"]), - "gdrive": lambda f: ensure_pendulum_datetime(f["modifiedTime"]), +DEFAULT_MTIME_FIELD_NAME = "mtime" +MTIME_FIELD_NAMES = { + "file": "mtime", + "s3": "LastModified", + "adl": "LastModified", + "az": "last_modified", + "gcs": "updated", + "memory": "created", + "gdrive": "modifiedTime", } # Support aliases -MTIME_DISPATCH["gs"] = MTIME_DISPATCH["gcs"] -MTIME_DISPATCH["s3a"] = MTIME_DISPATCH["s3"] -MTIME_DISPATCH["abfs"] = MTIME_DISPATCH["az"] +MTIME_FIELD_NAMES["gs"] = MTIME_FIELD_NAMES["gcs"] +MTIME_FIELD_NAMES["s3a"] = MTIME_FIELD_NAMES["s3"] +MTIME_FIELD_NAMES["abfs"] = MTIME_FIELD_NAMES["az"] # Map of protocol to a filesystem type CREDENTIALS_DISPATCH: Dict[str, Callable[[FilesystemConfiguration], DictStrAny]] = { @@ -110,7 +109,7 @@ def register_implementation_in_fsspec(protocol: str) -> None: if protocol in known_implementations: return - if not protocol in CUSTOM_IMPLEMENTATIONS: + if protocol not in CUSTOM_IMPLEMENTATIONS: raise ValueError( f"Unknown protocol: '{protocol}' is not an fsspec known " "implementations nor a dlt custom implementations." @@ -304,6 +303,32 @@ def guess_mime_type(file_name: str) -> Sequence[str]: return type_ +def extract_mtime(file_metadata: Dict[str, Any], protocol: str = None) -> pendulum.DateTime: + """Extract the modification time from file listing metadata. + + Args: + file_metadata (Dict[str, Any]): The file metadata. + protocol (str) [Optional]: The protocol. If not provided, None or not a known protocol, + then default field name `mtime` is tried. `mtime` is used for the "file" fsspec + implementation and our custom fsspec implementations. + + Returns: + pendulum.DateTime: The modification time. + + Raises: + KeyError: If the resolved field name is not found in the metadata. Current dlt use-cases + depend on a modified date. For example, transactional files, incremental destination + loading. + """ + field_name = MTIME_FIELD_NAMES.get(protocol, DEFAULT_MTIME_FIELD_NAME) + try: + return ensure_pendulum_datetime(file_metadata[field_name]) + except KeyError: + if protocol not in MTIME_FIELD_NAMES: + extra_message = " {DEFAULT_MTIME_FIELD_NAME} was used by default." + raise KeyError(f"`{field_name}` not found in metadata.{extra_message}") + + def glob_files( fs_client: AbstractFileSystem, bucket_url: str, file_glob: str = "**" ) -> Iterator[FileItem]: @@ -350,12 +375,14 @@ def glob_files( path=posixpath.join(bucket_url_parsed.path, file_name) ).geturl() + modification_date = extract_mtime(md, bucket_url_parsed.scheme) + mime_type, encoding = guess_mime_type(file_name) yield FileItem( file_name=file_name, file_url=file_url, mime_type=mime_type, encoding=encoding, - modification_date=MTIME_DISPATCH[bucket_url_parsed.scheme](md), + modification_date=modification_date, size_in_bytes=int(md["size"]), ) diff --git a/dlt/common/storages/transactional_file.py b/dlt/common/storages/transactional_file.py index e5ee220904..0fe9c310cf 100644 --- a/dlt/common/storages/transactional_file.py +++ b/dlt/common/storages/transactional_file.py @@ -16,7 +16,7 @@ import fsspec from dlt.common.pendulum import pendulum, timedelta -from dlt.common.storages.fsspec_filesystem import MTIME_DISPATCH +from dlt.common.storages.fsspec_filesystem import extract_mtime def lock_id(k: int = 4) -> str: @@ -56,8 +56,7 @@ def __init__(self, path: str, fs: fsspec.AbstractFileSystem) -> None: path: The path to lock. fs: The fsspec file system. """ - proto = fs.protocol[0] if isinstance(fs.protocol, (list, tuple)) else fs.protocol - self.extract_mtime = MTIME_DISPATCH.get(proto, MTIME_DISPATCH["file"]) + self._proto = fs.protocol[0] if isinstance(fs.protocol, (list, tuple)) else fs.protocol parsed_path = Path(path) if not parsed_path.is_absolute(): @@ -65,7 +64,7 @@ def __init__(self, path: str, fs: fsspec.AbstractFileSystem) -> None: f"{path} is not absolute. Please pass only absolute paths to TransactionalFile" ) self.path = path - if proto == "file": + if self._proto == "file": # standardize path separator to POSIX. fsspec always uses POSIX. Windows may use either. self.path = parsed_path.as_posix() @@ -103,7 +102,7 @@ def _sync_locks(self) -> t.List[str]: if not name.startswith(self.lock_prefix): continue # Purge stale locks - mtime = self.extract_mtime(lock) + mtime = extract_mtime(lock, self._proto) if now - mtime > timedelta(seconds=TransactionalFile.LOCK_TTL_SECONDS): try: # Janitors can race, so we ignore errors self._fs.rm(name) diff --git a/tests/common/storages/implementations/test_gitpythonfs.py b/tests/common/storages/implementations/test_gitpythonfs.py index a1f05a07ef..31a4f76118 100644 --- a/tests/common/storages/implementations/test_gitpythonfs.py +++ b/tests/common/storages/implementations/test_gitpythonfs.py @@ -173,7 +173,7 @@ def test_ls_file_details(repo_fixture: Iterator[Any]) -> None: assert isinstance( details["mode"], str ), "Should be a string representation of octal, without the 0o prefix." - assert isinstance(details["committed_date"], int) + assert isinstance(details["mtime"], int) def test_git_refs(repo_fixture: Iterator[Any]) -> None: diff --git a/tests/load/filesystem/test_filesystem_common.py b/tests/load/filesystem/test_filesystem_common.py index 1987d1274d..fd4592d897 100644 --- a/tests/load/filesystem/test_filesystem_common.py +++ b/tests/load/filesystem/test_filesystem_common.py @@ -13,7 +13,7 @@ from dlt.common.storages import fsspec_from_config, FilesystemConfiguration from dlt.common.storages.fsspec_filesystem import ( register_implementation_in_fsspec, - MTIME_DISPATCH, + extract_mtime, glob_files, ) from dlt.common.utils import uniq_id @@ -96,7 +96,7 @@ def check_file_exists(): def check_file_changed(): details = filesystem.info(file_url) assert details["size"] == 11 - assert (MTIME_DISPATCH[config.protocol](details) - now).seconds < 60 + assert (extract_mtime(details, config.protocol) - now).seconds < 60 bucket_url = os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] config = get_config() From 87ec2564485463e65a47d0f98d6cb72733b31219 Mon Sep 17 00:00:00 2001 From: Dean Jackson <57651082+deanja@users.noreply.github.com> Date: Thu, 15 Feb 2024 16:33:06 +1100 Subject: [PATCH 5/8] add default mtime --- dlt/common/storages/fsspec_filesystem.py | 26 ++++++++++-------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 036a86c225..0f6b31118d 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -306,27 +306,23 @@ def guess_mime_type(file_name: str) -> Sequence[str]: def extract_mtime(file_metadata: Dict[str, Any], protocol: str = None) -> pendulum.DateTime: """Extract the modification time from file listing metadata. + If a protocol is not provided, None or not a known protocol, + then default field name `mtime` is tried. If there's no `mtime` field, + the current time is returned. + + `mtime` is used for the "file" fsspec implementation and our custom fsspec implementations. + `mtime` is common terminology in unix-like systems. + Args: file_metadata (Dict[str, Any]): The file metadata. - protocol (str) [Optional]: The protocol. If not provided, None or not a known protocol, - then default field name `mtime` is tried. `mtime` is used for the "file" fsspec - implementation and our custom fsspec implementations. + protocol (str) [Optional]: The protocol. Returns: - pendulum.DateTime: The modification time. - - Raises: - KeyError: If the resolved field name is not found in the metadata. Current dlt use-cases - depend on a modified date. For example, transactional files, incremental destination - loading. + pendulum.DateTime: The latest modification time. Defaults to `now()` if no suitable + field is found in the metadata. """ field_name = MTIME_FIELD_NAMES.get(protocol, DEFAULT_MTIME_FIELD_NAME) - try: - return ensure_pendulum_datetime(file_metadata[field_name]) - except KeyError: - if protocol not in MTIME_FIELD_NAMES: - extra_message = " {DEFAULT_MTIME_FIELD_NAME} was used by default." - raise KeyError(f"`{field_name}` not found in metadata.{extra_message}") + return ensure_pendulum_datetime(file_metadata.get(field_name, pendulum.now())) def glob_files( From 5d644fcb5d33a9400c4a7401f7fce06c302d644a Mon Sep 17 00:00:00 2001 From: Dean Jackson <57651082+deanja@users.noreply.github.com> Date: Thu, 15 Feb 2024 17:40:52 +1100 Subject: [PATCH 6/8] move gitpythonfs --- dlt/common/storages/{implementations => fsspecs}/gitpythonfs.py | 2 +- tests/common/storages/{implementations => fsspecs}/__init__.py | 0 tests/common/storages/{implementations => fsspecs}/conftest.py | 0 .../storages/{implementations => fsspecs}/test_gitpythonfs.py | 2 +- 4 files changed, 2 insertions(+), 2 deletions(-) rename dlt/common/storages/{implementations => fsspecs}/gitpythonfs.py (99%) rename tests/common/storages/{implementations => fsspecs}/__init__.py (100%) rename tests/common/storages/{implementations => fsspecs}/conftest.py (100%) rename tests/common/storages/{implementations => fsspecs}/test_gitpythonfs.py (99%) diff --git a/dlt/common/storages/implementations/gitpythonfs.py b/dlt/common/storages/fsspecs/gitpythonfs.py similarity index 99% rename from dlt/common/storages/implementations/gitpythonfs.py rename to dlt/common/storages/fsspecs/gitpythonfs.py index fceb9af3b0..b5d3299d3a 100644 --- a/dlt/common/storages/implementations/gitpythonfs.py +++ b/dlt/common/storages/fsspecs/gitpythonfs.py @@ -17,7 +17,7 @@ def register_implementation_in_fsspec() -> None: """ register_implementation( "gitpythonfs", - "dlt.common.storages.implementations.gitpythonfs.GitPythonFileSystem", + "dlt.common.storages.fsspecs.gitpythonfs.GitPythonFileSystem", clobber=True, errtxt="Please install gitpythonfs to access GitPythonFileSystem", ) diff --git a/tests/common/storages/implementations/__init__.py b/tests/common/storages/fsspecs/__init__.py similarity index 100% rename from tests/common/storages/implementations/__init__.py rename to tests/common/storages/fsspecs/__init__.py diff --git a/tests/common/storages/implementations/conftest.py b/tests/common/storages/fsspecs/conftest.py similarity index 100% rename from tests/common/storages/implementations/conftest.py rename to tests/common/storages/fsspecs/conftest.py diff --git a/tests/common/storages/implementations/test_gitpythonfs.py b/tests/common/storages/fsspecs/test_gitpythonfs.py similarity index 99% rename from tests/common/storages/implementations/test_gitpythonfs.py rename to tests/common/storages/fsspecs/test_gitpythonfs.py index 31a4f76118..2a412c5a08 100644 --- a/tests/common/storages/implementations/test_gitpythonfs.py +++ b/tests/common/storages/fsspecs/test_gitpythonfs.py @@ -16,7 +16,7 @@ from git import Repo, BadName -from dlt.common.storages.implementations.gitpythonfs import ( +from dlt.common.storages.fsspecs.gitpythonfs import ( GitPythonFileSystem, register_implementation_in_fsspec, get_revisions_all_raw, From 62790c1a4ec6851f048e1d0f27191a0b8b7c532b Mon Sep 17 00:00:00 2001 From: Dean Jackson <57651082+deanja@users.noreply.github.com> Date: Thu, 15 Feb 2024 17:41:29 +1100 Subject: [PATCH 7/8] move dynamic registration --- dlt/common/storages/fsspec_filesystem.py | 4 +- ...ilesystem.py => test_fsspec_filesystem.py} | 46 ++++++++++++++++++- .../load/filesystem/test_filesystem_common.py | 41 ----------------- 3 files changed, 47 insertions(+), 44 deletions(-) rename tests/common/storages/{test_local_filesystem.py => test_fsspec_filesystem.py} (54%) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 0f6b31118d..043356a73f 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -86,7 +86,7 @@ class FileItem(TypedDict, total=False): "errtxt": "Please install gdrivefs to access GoogleDriveFileSystem", }, "gitpythonfs": { - "fq_classname": "dlt.common.storages.implementations.gitpythonfs.GitPythonFileSystem", + "fq_classname": "dlt.common.storages.fsspecs.gitpythonfs.GitPythonFileSystem", "errtxt": "Please install gitpythonfs to access GitPythonFileSystem", }, } @@ -310,7 +310,7 @@ def extract_mtime(file_metadata: Dict[str, Any], protocol: str = None) -> pendul then default field name `mtime` is tried. If there's no `mtime` field, the current time is returned. - `mtime` is used for the "file" fsspec implementation and our custom fsspec implementations. + `mtime` is used for the "file" fsspec implementation and our custom fsspec implementations. `mtime` is common terminology in unix-like systems. Args: diff --git a/tests/common/storages/test_local_filesystem.py b/tests/common/storages/test_fsspec_filesystem.py similarity index 54% rename from tests/common/storages/test_local_filesystem.py rename to tests/common/storages/test_fsspec_filesystem.py index ea6adec2c7..318e9439d0 100644 --- a/tests/common/storages/test_local_filesystem.py +++ b/tests/common/storages/test_fsspec_filesystem.py @@ -4,7 +4,11 @@ import pathlib from dlt.common.storages import fsspec_from_config, FilesystemConfiguration -from dlt.common.storages.fsspec_filesystem import FileItemDict, glob_files +from dlt.common.storages.fsspec_filesystem import ( + FileItemDict, + glob_files, + register_implementation_in_fsspec, +) from tests.common.storages.utils import assert_sample_files @@ -54,3 +58,43 @@ def test_filesystem_decompress() -> None: # read as uncompressed binary with file_dict.open(compression="enable") as f: assert f.read().startswith(b'"1200864931","2015-07-01 00:00:13"') + + +def test_register_implementation_in_fsspec() -> None: + """Test registering a filesystem implementation with fsspec.""" + # ToDo make test more focused and safe with mock. Just want a unit test. + from fsspec.registry import known_implementations, available_protocols, register_implementation + + protocol = "dummyfs" + previous_registration_existed = False + + # setup + if protocol in known_implementations: + backup = known_implementations.pop(protocol) + previous_registration_existed = True + + assert ( + protocol not in known_implementations + ), f"As a test precondition, {protocol} should not be registered." + + # do and test + register_implementation_in_fsspec(protocol) + assert protocol in available_protocols(), f"{protocol} should be registered." + + # teardown + if previous_registration_existed: + register_implementation(protocol, backup, clobber=True) + assert ( + protocol in available_protocols() + ), f"After teardown, {protocol} should not be registered, which was the original state." + else: + known_implementations.pop(protocol) + assert ( + protocol not in known_implementations + ), f"After teardown, {protocol} should not be registered, which was the original state." + + +def test_register_unsupported_raises() -> None: + """Test registering an unsupported filesystem implementation with fsspec raises an error.""" + with pytest.raises(ValueError, match=r"Unknown protocol: 'unsupportedfs_t7Y8'"): + register_implementation_in_fsspec("unsupportedfs_t7Y8") diff --git a/tests/load/filesystem/test_filesystem_common.py b/tests/load/filesystem/test_filesystem_common.py index fd4592d897..bfc3800fc0 100644 --- a/tests/load/filesystem/test_filesystem_common.py +++ b/tests/load/filesystem/test_filesystem_common.py @@ -12,7 +12,6 @@ from dlt.common.configuration.specs import AzureCredentials, AzureCredentialsWithoutDefaults from dlt.common.storages import fsspec_from_config, FilesystemConfiguration from dlt.common.storages.fsspec_filesystem import ( - register_implementation_in_fsspec, extract_mtime, glob_files, ) @@ -46,46 +45,6 @@ def test_filesystem_configuration() -> None: } -def test_register_implementation_in_fsspec() -> None: - """Test registering a filesystem implementation with fsspec.""" - # ToDo make test more focused and safe with mock. Just want a unit test. - from fsspec.registry import known_implementations, available_protocols, register_implementation - - protocol = "dummyfs" - previous_registration_existed = False - - # setup - if protocol in known_implementations: - backup = known_implementations.pop(protocol) - previous_registration_existed = True - - assert ( - protocol not in known_implementations - ), f"As a test precondition, {protocol} should not be registered." - - # do and test - register_implementation_in_fsspec(protocol) - assert protocol in available_protocols(), f"{protocol} should be registered." - - # teardown - if previous_registration_existed: - register_implementation(protocol, backup, clobber=True) - assert ( - protocol in available_protocols() - ), f"After teardown, {protocol} should not be registered, which was the original state." - else: - known_implementations.pop(protocol) - assert ( - protocol not in known_implementations - ), f"After teardown, {protocol} should not be registered, which was the original state." - - -def test_register_unsupported_raises() -> None: - """Test registering an unsupported filesystem implementation with fsspec raises an error.""" - with pytest.raises(ValueError, match=r"Unknown protocol: 'unsupportedfs_t7Y8'"): - register_implementation_in_fsspec("unsupportedfs_t7Y8") - - def test_filesystem_instance(with_gdrive_buckets_env: str) -> None: @retry(stop=stop_after_attempt(10), wait=wait_fixed(1)) def check_file_exists(): From b473fb5a0c5def2ec562b65c4fd8d717bdd4f228 Mon Sep 17 00:00:00 2001 From: Dean Jackson <57651082+deanja@users.noreply.github.com> Date: Fri, 16 Feb 2024 21:47:40 +1100 Subject: [PATCH 8/8] use distinct filesystem instances in pipeline threads - clearer naming in FileItemDict --- dlt/common/storages/fsspec_filesystem.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 043356a73f..218b9904b8 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -199,29 +199,31 @@ class FileItemDict(DictStrAny): def __init__( self, mapping: FileItem, - credentials: Optional[Union[FileSystemCredentials, AbstractFileSystem]] = None, + fs_details: Optional[Union[AbstractFileSystem, FilesystemConfiguration, FileSystemCredentials]] = None, ): """Create a dictionary with the filesystem client. Args: mapping (FileItem): The file item TypedDict. - credentials (Optional[FileSystemCredentials], optional): The credentials to the + fs_details (Optional[AbstractFileSystem, FilesystemConfiguration, FileSystemCredentials], optional): Details to help get a filesystem. Defaults to None. """ - self.credentials = credentials + self.fs_details = fs_details super().__init__(**mapping) @property def fsspec(self) -> AbstractFileSystem: - """The filesystem client is based on the given credentials. + """The filesystem client is based on the given details. Returns: - AbstractFileSystem: The fsspec client. + AbstractFileSystem: An fsspec client. """ - if isinstance(self.credentials, AbstractFileSystem): - return self.credentials + if isinstance(self.fs_details, AbstractFileSystem): + return self.fs_details + elif isinstance(self.fs_details, FilesystemConfiguration): + return fsspec_from_config(self.fs_details)[0] else: - return fsspec_filesystem(self["file_url"], self.credentials)[0] + return fsspec_filesystem(self["file_url"], self.fs_details)[0] def open( # noqa: A003 self,