Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add git to filesystem b 301 #350

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ packages = [{include = "sources"}]

[tool.poetry.dependencies]
python = ">=3.8.1,<3.13"
dlt = {version = "0.4.4", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]}
# dlt = {version = "0.4.4", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]}
dlt = {git = "https://github.com/deanja/dlt.git", branch = "add-git-to-filesystem-source-301", extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]}

[tool.poetry.group.dev.dependencies]
mypy = "1.6.1"
Expand Down
37 changes: 29 additions & 8 deletions sources/filesystem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
from typing import Iterator, List, Optional, Tuple, Union

import dlt
from dlt.common.typing import copy_sig
from dlt.common.typing import copy_sig, DictStrAny
from dlt.sources import DltResource
from dlt.sources.filesystem import FileItem, FileItemDict, fsspec_filesystem, glob_files
from dlt.sources.filesystem import FileItem, FileItemDict, glob_files
from dlt.sources.credentials import FileSystemCredentials
from dlt.common.storages.fsspec_filesystem import fsspec_from_config
from dlt.common.storages.configuration import FilesystemConfiguration


from .helpers import (
AbstractFileSystem,
Expand All @@ -26,6 +29,7 @@ def readers(
bucket_url: str = dlt.secrets.value,
credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value,
file_glob: Optional[str] = "*",
kwargs: Optional[DictStrAny] = None,
) -> Tuple[DltResource, ...]:
"""This source provides a few resources that are chunked file readers. Readers can be further parametrized before use
read_csv(chunksize, **pandas_kwargs)
Expand All @@ -38,13 +42,28 @@ def readers(
file_glob (str, optional): The filter to apply to the files in glob format. by default lists all files in bucket_url non-recursively
"""
return (
filesystem(bucket_url, credentials, file_glob=file_glob)
filesystem(
bucket_url,
credentials,
file_glob=file_glob,
kwargs=kwargs,
)
| dlt.transformer(name="read_csv")(_read_csv),
filesystem(bucket_url, credentials, file_glob=file_glob)
filesystem(
bucket_url,
credentials,
file_glob=file_glob,
kwargs=kwargs,
)
| dlt.transformer(name="read_jsonl")(_read_jsonl),
filesystem(bucket_url, credentials, file_glob=file_glob)
filesystem(
bucket_url,
credentials,
file_glob=file_glob,
kwargs=kwargs,
)
| dlt.transformer(name="read_parquet")(_read_parquet),
filesystem(bucket_url, credentials, file_glob=file_glob)
filesystem(bucket_url, credentials, file_glob=file_glob, kwargs=kwargs)
| dlt.transformer(name="read_csv_duckdb")(_read_csv_duckdb),
)

Expand All @@ -58,6 +77,7 @@ def filesystem(
file_glob: Optional[str] = "*",
files_per_page: int = DEFAULT_CHUNK_SIZE,
extract_content: bool = False,
kwargs: Optional[DictStrAny] = None,
) -> Iterator[List[FileItem]]:
"""This resource lists files in `bucket_url` using `file_glob` pattern. The files are yielded as FileItem which also
provide methods to open and read file data. It should be combined with transformers that further process (ie. load files)
Expand All @@ -73,14 +93,15 @@ def filesystem(
Returns:
Iterator[List[FileItem]]: The list of files.
"""
fs_config = FilesystemConfiguration(bucket_url, credentials, kwargs=kwargs)
if isinstance(credentials, AbstractFileSystem):
fs_client = credentials
else:
fs_client = fsspec_filesystem(bucket_url, credentials)[0]
fs_client = fsspec_from_config(fs_config)[0]

files_chunk: List[FileItem] = []
for file_model in glob_files(fs_client, bucket_url, file_glob):
file_dict = FileItemDict(file_model, credentials)
file_dict = FileItemDict(file_model, fs_config)
if extract_content:
file_dict["file_content"] = file_dict.read_bytes()
files_chunk.append(file_dict) # type: ignore
Expand Down
83 changes: 83 additions & 0 deletions tests/filesystem/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from typing import Union, Any, Iterator

import pytest

import os
import subprocess
import shutil
import pathlib
from .settings import (
TEST_SAMPLES_PATH,
REPO_FIXTURE_PATH,
REPO_SAFE_PREFIX,
REPO_GOOD_REF,
)


@pytest.fixture(scope="module", autouse=True)
def repo_fixture(
deanja marked this conversation as resolved.
Show resolved Hide resolved
repo_path: Union[str, os.PathLike] = REPO_FIXTURE_PATH,
samples_path: Union[str, os.PathLike] = TEST_SAMPLES_PATH,
safe_prefix: str = REPO_SAFE_PREFIX,
good_ref: str = REPO_GOOD_REF,
) -> Iterator[Any]:
"""Create a temporary git repository to test git-based filesystems.

Args:
repo_path (str): The path at which to create the temporary repo. Defaults to REPO_FIXTURE_PATH. It is safest
to create the repo outside your software project's directory tree so it does not interfere with real repos.
samples_path (str): The path to the sample files, which will be added to the repo. Defaults to TEST_SAMPLES_PATH.
safe_prefix (str): Helps prevent mixups between the test repo and real repos. Defaults to REPO_SAFE_PREFIX.
good_ref (str): The git ref for the unmodified sample files. Later commits intentionally break the sample
files so that tests will fail if the system under test doesn't correctly handle refs.
Defaults to REPO_SAFE_REF.

Yields:
Tuple[str, str]: A tuple containing the repo_path and good_ref.

"""
repo_path = pathlib.Path(repo_path)
samples_path = pathlib.Path(samples_path)

try:
try:
os.mkdir(repo_path)
except FileExistsError:
raise FileExistsError(
f"Directory `{repo_path.absolute()}` already exists."
"It should have been removed by the previous test run."
)

# NOTE: `git init -b` option requires git 2.28 or later.
subprocess.call(f"git init -b {safe_prefix}master", shell=True, cwd=repo_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that somehow more convenient than using gitpython that we have in deps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. For gitpython way, do you mean:

  1. just using git.<command>(...) instead of the above subprocess.call(). Like this https://gitpython.readthedocs.io/en/stable/tutorial.html#using-git-directly ?
  2. Or deeper in the gitpython API? Oh, I think it's read-only.

For me subprocess.call() is easier to read because it looks like git on the command line. I don't want the reader to have to learn a new API to understand the test data. Also it's how I saw test repo done in other fsspec implementations.

The code lines did get kind of long though ☹️ . And gitpython would let other settings be made if needed.

I am happy to try the gitpython way. Let me know.

subprocess.call(
"git config user.email '[email protected]'", shell=True, cwd=repo_path
)
subprocess.call("git config user.name 'Your Name'", shell=True, cwd=repo_path)
shutil.copytree(
samples_path, repo_path / f"{safe_prefix}samples", dirs_exist_ok=False
)
subprocess.call("git add --all", shell=True, cwd=repo_path)
subprocess.call(
"git commit -m 'add standard sample files for tests'",
shell=True,
cwd=repo_path,
)
subprocess.call(
f"git tag -a {good_ref} -m 'The sample test files with no modifications'",
shell=True,
cwd=repo_path,
)
subprocess.call(
"git mv sample.txt sample_renamed.txt",
shell=True,
cwd=repo_path / f"{safe_prefix}samples",
)
subprocess.call(
"git commit -m 'rename samples.txt to make primitive test fail if at HEAD'",
shell=True,
cwd=repo_path,
)
yield repo_path, good_ref
finally:
shutil.rmtree(repo_path)
38 changes: 31 additions & 7 deletions tests/filesystem/settings.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
import os
from typing import Union
from os import PathLike
from pathlib import Path
from tempfile import gettempdir

TESTS_BUCKET_URLS = [
os.path.abspath("tests/filesystem/samples"),
"s3://dlt-ci-test-bucket/standard_source/samples",
"gs://ci-test-bucket/standard_source/samples",
"az://dlt-ci-test-bucket/standard_source/samples",

TEST_SAMPLES_PATH: str = "tests/filesystem/samples"
REPO_FIXTURE_PATH: Union[str, PathLike] = Path(
gettempdir(), "dlt_test_repo_t8hY3x"
).absolute()
REPO_SAFE_PREFIX: str = "test-"
REPO_GOOD_REF = "good-ref"

FACTORY_ARGS = [
{"bucket_url": str(Path(TEST_SAMPLES_PATH).absolute())},
{"bucket_url": "s3://dlt-ci-test-bucket/standard_source/samples"},
deanja marked this conversation as resolved.
Show resolved Hide resolved
{"bucket_url": "gs://ci-test-bucket/standard_source/samples"},
{"bucket_url": "az://dlt-ci-test-bucket/standard_source/samples"},
{
"bucket_url": f"gitpythonfs://{REPO_SAFE_PREFIX}samples",
"kwargs": {
"repo_path": REPO_FIXTURE_PATH,
"ref": REPO_GOOD_REF,
},
},
]

GLOB_RESULTS = [
FACTORY_TEST_IDS = [
f"url={factory_args['bucket_url'][:15]}..." for factory_args in FACTORY_ARGS
]

GLOBS = [
{
"glob": None,
"file_names": ["sample.txt"],
Expand Down Expand Up @@ -66,3 +88,5 @@
"file_names": ["sample.txt"],
},
]

GLOB_TEST_IDS = [f"glob={glob_result['glob']}" for glob_result in GLOBS]
Loading
Loading