diff --git a/pyproject.toml b/pyproject.toml index f2cf9895d..a83f33a92 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,8 @@ packages = [{include = "sources"}] [tool.poetry.dependencies] python = ">=3.8.1,<3.13" -dlt = {version = "0.4.4", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]} +# dlt = {version = "0.4.4", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]} +dlt = {git = "https://github.com/deanja/dlt.git", branch = "add-git-to-filesystem-source-301", extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]} [tool.poetry.group.dev.dependencies] mypy = "1.6.1" diff --git a/sources/filesystem/__init__.py b/sources/filesystem/__init__.py index c6452543e..2b08ea03d 100644 --- a/sources/filesystem/__init__.py +++ b/sources/filesystem/__init__.py @@ -2,10 +2,13 @@ from typing import Iterator, List, Optional, Tuple, Union import dlt -from dlt.common.typing import copy_sig +from dlt.common.typing import copy_sig, DictStrAny from dlt.sources import DltResource -from dlt.sources.filesystem import FileItem, FileItemDict, fsspec_filesystem, glob_files +from dlt.sources.filesystem import FileItem, FileItemDict, glob_files from dlt.sources.credentials import FileSystemCredentials +from dlt.common.storages.fsspec_filesystem import fsspec_from_config +from dlt.common.storages.configuration import FilesystemConfiguration + from .helpers import ( AbstractFileSystem, @@ -26,6 +29,7 @@ def readers( bucket_url: str = dlt.secrets.value, credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value, file_glob: Optional[str] = "*", + kwargs: Optional[DictStrAny] = None, ) -> Tuple[DltResource, ...]: """This source provides a few resources that are chunked file readers. Readers can be further parametrized before use read_csv(chunksize, **pandas_kwargs) @@ -38,13 +42,28 @@ def readers( file_glob (str, optional): The filter to apply to the files in glob format. by default lists all files in bucket_url non-recursively """ return ( - filesystem(bucket_url, credentials, file_glob=file_glob) + filesystem( + bucket_url, + credentials, + file_glob=file_glob, + kwargs=kwargs, + ) | dlt.transformer(name="read_csv")(_read_csv), - filesystem(bucket_url, credentials, file_glob=file_glob) + filesystem( + bucket_url, + credentials, + file_glob=file_glob, + kwargs=kwargs, + ) | dlt.transformer(name="read_jsonl")(_read_jsonl), - filesystem(bucket_url, credentials, file_glob=file_glob) + filesystem( + bucket_url, + credentials, + file_glob=file_glob, + kwargs=kwargs, + ) | dlt.transformer(name="read_parquet")(_read_parquet), - filesystem(bucket_url, credentials, file_glob=file_glob) + filesystem(bucket_url, credentials, file_glob=file_glob, kwargs=kwargs) | dlt.transformer(name="read_csv_duckdb")(_read_csv_duckdb), ) @@ -58,6 +77,7 @@ def filesystem( file_glob: Optional[str] = "*", files_per_page: int = DEFAULT_CHUNK_SIZE, extract_content: bool = False, + kwargs: Optional[DictStrAny] = None, ) -> Iterator[List[FileItem]]: """This resource lists files in `bucket_url` using `file_glob` pattern. The files are yielded as FileItem which also provide methods to open and read file data. It should be combined with transformers that further process (ie. load files) @@ -73,14 +93,15 @@ def filesystem( Returns: Iterator[List[FileItem]]: The list of files. """ + fs_config = FilesystemConfiguration(bucket_url, credentials, kwargs=kwargs) if isinstance(credentials, AbstractFileSystem): fs_client = credentials else: - fs_client = fsspec_filesystem(bucket_url, credentials)[0] + fs_client = fsspec_from_config(fs_config)[0] files_chunk: List[FileItem] = [] for file_model in glob_files(fs_client, bucket_url, file_glob): - file_dict = FileItemDict(file_model, credentials) + file_dict = FileItemDict(file_model, fs_config) if extract_content: file_dict["file_content"] = file_dict.read_bytes() files_chunk.append(file_dict) # type: ignore diff --git a/tests/filesystem/conftest.py b/tests/filesystem/conftest.py new file mode 100644 index 000000000..091b6c9dd --- /dev/null +++ b/tests/filesystem/conftest.py @@ -0,0 +1,83 @@ +from typing import Union, Any, Iterator + +import pytest + +import os +import subprocess +import shutil +import pathlib +from .settings import ( + TEST_SAMPLES_PATH, + REPO_FIXTURE_PATH, + REPO_SAFE_PREFIX, + REPO_GOOD_REF, +) + + +@pytest.fixture(scope="module", autouse=True) +def repo_fixture( + repo_path: Union[str, os.PathLike] = REPO_FIXTURE_PATH, + samples_path: Union[str, os.PathLike] = TEST_SAMPLES_PATH, + safe_prefix: str = REPO_SAFE_PREFIX, + good_ref: str = REPO_GOOD_REF, +) -> Iterator[Any]: + """Create a temporary git repository to test git-based filesystems. + + Args: + repo_path (str): The path at which to create the temporary repo. Defaults to REPO_FIXTURE_PATH. It is safest + to create the repo outside your software project's directory tree so it does not interfere with real repos. + samples_path (str): The path to the sample files, which will be added to the repo. Defaults to TEST_SAMPLES_PATH. + safe_prefix (str): Helps prevent mixups between the test repo and real repos. Defaults to REPO_SAFE_PREFIX. + good_ref (str): The git ref for the unmodified sample files. Later commits intentionally break the sample + files so that tests will fail if the system under test doesn't correctly handle refs. + Defaults to REPO_SAFE_REF. + + Yields: + Tuple[str, str]: A tuple containing the repo_path and good_ref. + + """ + repo_path = pathlib.Path(repo_path) + samples_path = pathlib.Path(samples_path) + + try: + try: + os.mkdir(repo_path) + except FileExistsError: + raise FileExistsError( + f"Directory `{repo_path.absolute()}` already exists." + "It should have been removed by the previous test run." + ) + + # NOTE: `git init -b` option requires git 2.28 or later. + subprocess.call(f"git init -b {safe_prefix}master", shell=True, cwd=repo_path) + subprocess.call( + "git config user.email 'you@example.com'", shell=True, cwd=repo_path + ) + subprocess.call("git config user.name 'Your Name'", shell=True, cwd=repo_path) + shutil.copytree( + samples_path, repo_path / f"{safe_prefix}samples", dirs_exist_ok=False + ) + subprocess.call("git add --all", shell=True, cwd=repo_path) + subprocess.call( + "git commit -m 'add standard sample files for tests'", + shell=True, + cwd=repo_path, + ) + subprocess.call( + f"git tag -a {good_ref} -m 'The sample test files with no modifications'", + shell=True, + cwd=repo_path, + ) + subprocess.call( + "git mv sample.txt sample_renamed.txt", + shell=True, + cwd=repo_path / f"{safe_prefix}samples", + ) + subprocess.call( + "git commit -m 'rename samples.txt to make primitive test fail if at HEAD'", + shell=True, + cwd=repo_path, + ) + yield repo_path, good_ref + finally: + shutil.rmtree(repo_path) diff --git a/tests/filesystem/settings.py b/tests/filesystem/settings.py index a442052b3..a7c3db1dd 100644 --- a/tests/filesystem/settings.py +++ b/tests/filesystem/settings.py @@ -1,13 +1,35 @@ -import os +from typing import Union +from os import PathLike +from pathlib import Path +from tempfile import gettempdir -TESTS_BUCKET_URLS = [ - os.path.abspath("tests/filesystem/samples"), - "s3://dlt-ci-test-bucket/standard_source/samples", - "gs://ci-test-bucket/standard_source/samples", - "az://dlt-ci-test-bucket/standard_source/samples", + +TEST_SAMPLES_PATH: str = "tests/filesystem/samples" +REPO_FIXTURE_PATH: Union[str, PathLike] = Path( + gettempdir(), "dlt_test_repo_t8hY3x" +).absolute() +REPO_SAFE_PREFIX: str = "test-" +REPO_GOOD_REF = "good-ref" + +FACTORY_ARGS = [ + {"bucket_url": str(Path(TEST_SAMPLES_PATH).absolute())}, + {"bucket_url": "s3://dlt-ci-test-bucket/standard_source/samples"}, + {"bucket_url": "gs://ci-test-bucket/standard_source/samples"}, + {"bucket_url": "az://dlt-ci-test-bucket/standard_source/samples"}, + { + "bucket_url": f"gitpythonfs://{REPO_SAFE_PREFIX}samples", + "kwargs": { + "repo_path": REPO_FIXTURE_PATH, + "ref": REPO_GOOD_REF, + }, + }, ] -GLOB_RESULTS = [ +FACTORY_TEST_IDS = [ + f"url={factory_args['bucket_url'][:15]}..." for factory_args in FACTORY_ARGS +] + +GLOBS = [ { "glob": None, "file_names": ["sample.txt"], @@ -66,3 +88,5 @@ "file_names": ["sample.txt"], }, ] + +GLOB_TEST_IDS = [f"glob={glob_result['glob']}" for glob_result in GLOBS] diff --git a/tests/filesystem/test_filesystem.py b/tests/filesystem/test_filesystem.py index 550f4c29d..3dba411a1 100644 --- a/tests/filesystem/test_filesystem.py +++ b/tests/filesystem/test_filesystem.py @@ -18,33 +18,47 @@ assert_query_data, TEST_STORAGE_ROOT, ) +from tests.filesystem.utils import unpack_factory_args -from .settings import GLOB_RESULTS, TESTS_BUCKET_URLS +from .settings import GLOBS, GLOB_TEST_IDS, FACTORY_ARGS, FACTORY_TEST_IDS -@pytest.mark.parametrize("bucket_url", TESTS_BUCKET_URLS) -@pytest.mark.parametrize("glob_params", GLOB_RESULTS) -def test_file_list(bucket_url: str, glob_params: Dict[str, Any]) -> None: +@pytest.mark.parametrize("factory_args", FACTORY_ARGS, ids=FACTORY_TEST_IDS) +@pytest.mark.parametrize("globs", GLOBS, ids=GLOB_TEST_IDS) +def test_file_list(factory_args: Dict[str, Any], globs: Dict[str, Any]) -> None: + bucket_url, kwargs = unpack_factory_args(factory_args) + @dlt.transformer def bypass(items) -> str: return items - # we just pass the glob parameter to the resource if it is not None - if file_glob := glob_params["glob"]: - filesystem_res = filesystem(bucket_url=bucket_url, file_glob=file_glob) | bypass + # we only pass the glob parameter to the resource if it is not None + if file_glob := globs["glob"]: + filesystem_res = ( + filesystem( + bucket_url=bucket_url, + file_glob=file_glob, + kwargs=kwargs, + ) + | bypass + ) else: - filesystem_res = filesystem(bucket_url=bucket_url) | bypass + filesystem_res = filesystem(bucket_url=bucket_url, kwargs=kwargs) | bypass all_files = list(filesystem_res) file_count = len(all_files) file_names = [item["file_name"] for item in all_files] - assert file_count == len(glob_params["file_names"]) - assert file_names == glob_params["file_names"] + assert file_count == len(globs["file_names"]) + assert file_names == globs["file_names"] @pytest.mark.parametrize("extract_content", [True, False]) -@pytest.mark.parametrize("bucket_url", TESTS_BUCKET_URLS) -def test_load_content_resources(bucket_url: str, extract_content: bool) -> None: +@pytest.mark.parametrize("factory_args", FACTORY_ARGS, ids=FACTORY_TEST_IDS) +def test_load_content_resources( + factory_args: Dict[str, Any], extract_content: bool +) -> None: + bucket_url, kwargs = unpack_factory_args(factory_args) + @dlt.transformer def assert_sample_content(items: List[FileItem]): # expect just one file @@ -53,7 +67,7 @@ def assert_sample_content(items: List[FileItem]): content = item.read_bytes() assert content == b"dlthub content" assert item["size_in_bytes"] == 14 - assert item["file_url"].endswith("/samples/sample.txt") + assert item["file_url"].endswith("samples/sample.txt") assert item["mime_type"] == "text/plain" assert isinstance(item["modification_date"], pendulum.DateTime) @@ -65,6 +79,7 @@ def assert_sample_content(items: List[FileItem]): bucket_url=bucket_url, file_glob="sample.txt", extract_content=extract_content, + kwargs=kwargs, ) | assert_sample_content ) @@ -78,12 +93,16 @@ def assert_csv_file(item: FileItem): # on windows when checking out, git will convert lf into cr+lf so we have more bytes (+ number of lines: 25) assert item["size_in_bytes"] in (742, 767) assert item["file_name"] == "met_csv/A801/A881_20230920.csv" - assert item["file_url"].endswith("/samples/met_csv/A801/A881_20230920.csv") + assert item["file_url"].endswith("samples/met_csv/A801/A881_20230920.csv") assert item["mime_type"] == "text/csv" # print(item) return item - nested_file = filesystem(bucket_url, file_glob="met_csv/A801/A881_20230920.csv") + nested_file = filesystem( + bucket_url, + file_glob="met_csv/A801/A881_20230920.csv", + kwargs=kwargs, + ) assert len(list(nested_file | assert_csv_file)) == 1 @@ -101,10 +120,12 @@ def test_fsspec_as_credentials(): print(list(gs_resource)) -@pytest.mark.parametrize("bucket_url", TESTS_BUCKET_URLS) -def test_csv_transformers(bucket_url: str) -> None: +@pytest.mark.parametrize("factory_args", FACTORY_ARGS, ids=FACTORY_TEST_IDS) +def test_csv_transformers(factory_args: Dict[str, Any]) -> None: from sources.filesystem_pipeline import read_csv + bucket_url, kwargs = unpack_factory_args(factory_args) + pipeline = dlt.pipeline( pipeline_name="file_data", destination="duckdb", @@ -114,7 +135,12 @@ def test_csv_transformers(bucket_url: str) -> None: # load all csvs merging data on a date column met_files = ( - filesystem(bucket_url=bucket_url, file_glob="met_csv/A801/*.csv") | read_csv() + filesystem( + bucket_url=bucket_url, + file_glob="met_csv/A801/*.csv", + kwargs=kwargs, + ) + | read_csv() ) met_files.apply_hints(write_disposition="merge", merge_key="date") load_info = pipeline.run(met_files.with_name("met_csv")) @@ -126,7 +152,12 @@ def test_csv_transformers(bucket_url: str) -> None: # load the other folder that contains data for the same day + one other day # the previous data will be replaced met_files = ( - filesystem(bucket_url=bucket_url, file_glob="met_csv/A803/*.csv") | read_csv() + filesystem( + bucket_url=bucket_url, + file_glob="met_csv/A803/*.csv", + kwargs=kwargs, + ) + | read_csv() ) met_files.apply_hints(write_disposition="merge", merge_key="date") load_info = pipeline.run(met_files.with_name("met_csv")) @@ -139,16 +170,23 @@ def test_csv_transformers(bucket_url: str) -> None: assert load_table_counts(pipeline, "met_csv") == {"met_csv": 48} -@pytest.mark.parametrize("bucket_url", TESTS_BUCKET_URLS) -def test_standard_readers(bucket_url: str) -> None: +@pytest.mark.parametrize("factory_args", FACTORY_ARGS, ids=FACTORY_TEST_IDS) +def test_standard_readers(factory_args: Dict[str, Any]) -> None: + bucket_url, kwargs = unpack_factory_args(factory_args) + # extract pipes with standard readers - jsonl_reader = readers(bucket_url, file_glob="**/*.jsonl").read_jsonl() - parquet_reader = readers(bucket_url, file_glob="**/*.parquet").read_parquet() - # also read zipped csvs - csv_reader = readers(bucket_url, file_glob="**/*.csv*").read_csv( + jsonl_reader = readers( + bucket_url, file_glob="**/*.jsonl", kwargs=kwargs + ).read_jsonl() + parquet_reader = readers( + bucket_url, file_glob="**/*.parquet", kwargs=kwargs + ).read_parquet() + csv_reader = readers(bucket_url, file_glob="**/*.csv*", kwargs=kwargs).read_csv( float_precision="high" ) - csv_duckdb_reader = readers(bucket_url, file_glob="**/*.csv*").read_csv_duckdb() + csv_duckdb_reader = readers( + bucket_url, file_glob="**/*.csv*", kwargs=kwargs + ).read_csv_duckdb() # a step that copies files into test storage def _copy(item: FileItemDict): @@ -161,7 +199,7 @@ def _copy(item: FileItemDict): # return file item unchanged return item - downloader = filesystem(bucket_url, file_glob="**").add_map(_copy) + downloader = filesystem(bucket_url, file_glob="**", kwargs=kwargs).add_map(_copy) # load in single pipeline pipeline = dlt.pipeline( @@ -200,12 +238,14 @@ def _copy(item: FileItemDict): # print(pipeline.default_schema.to_pretty_yaml()) -@pytest.mark.parametrize("bucket_url", TESTS_BUCKET_URLS) -def test_incremental_load(bucket_url: str) -> None: +@pytest.mark.parametrize("factory_args", FACTORY_ARGS, ids=FACTORY_TEST_IDS) +def test_incremental_load(factory_args: Dict[str, Any]) -> None: @dlt.transformer def bypass(items) -> str: return items + bucket_url, kwargs = unpack_factory_args(factory_args) + pipeline = dlt.pipeline( pipeline_name="file_data", destination="duckdb", @@ -214,7 +254,11 @@ def bypass(items) -> str: ) # Load all files - all_files = filesystem(bucket_url=bucket_url, file_glob="csv/*") + all_files = filesystem( + bucket_url=bucket_url, + file_glob="csv/*", + kwargs=kwargs, + ) # add incremental on modification time all_files.apply_hints(incremental=dlt.sources.incremental("modification_date")) load_info = pipeline.run((all_files | bypass).with_name("csv_files")) @@ -225,7 +269,11 @@ def bypass(items) -> str: assert table_counts["csv_files"] == 4 # load again - all_files = filesystem(bucket_url=bucket_url, file_glob="csv/*") + all_files = filesystem( + bucket_url=bucket_url, + file_glob="csv/*", + kwargs=kwargs, + ) all_files.apply_hints(incremental=dlt.sources.incremental("modification_date")) load_info = pipeline.run((all_files | bypass).with_name("csv_files")) # nothing into csv_files @@ -234,7 +282,11 @@ def bypass(items) -> str: assert table_counts["csv_files"] == 4 # load again into different table - all_files = filesystem(bucket_url=bucket_url, file_glob="csv/*") + all_files = filesystem( + bucket_url=bucket_url, + file_glob="csv/*", + kwargs=kwargs, + ) all_files.apply_hints(incremental=dlt.sources.incremental("modification_date")) load_info = pipeline.run((all_files | bypass).with_name("csv_files_2")) assert_load_info(load_info) @@ -243,7 +295,7 @@ def bypass(items) -> str: def test_file_chunking() -> None: resource = filesystem( - bucket_url=TESTS_BUCKET_URLS[0], + bucket_url=FACTORY_ARGS[0]["bucket_url"], file_glob="*/*.csv", files_per_page=2, ) diff --git a/tests/filesystem/utils.py b/tests/filesystem/utils.py new file mode 100644 index 000000000..4eb62bd74 --- /dev/null +++ b/tests/filesystem/utils.py @@ -0,0 +1,6 @@ +from typing import Any, Dict, List + + +def unpack_factory_args(factory_args: Dict[str, Any]) -> List[Any]: + """Unpacks filesystem factory arguments from pytest parameters.""" + return [factory_args.get(k) for k in ("bucket_url", "kwargs")]