From 36e94afe99c2094aee22e619b1fee62ff1e2c4ef Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 24 Jun 2024 17:34:11 +0200 Subject: [PATCH] some work on filesystem destination --- dlt/dataset.py | 2 +- .../impl/filesystem/filesystem.py | 29 ++++++- tests/load/test_read_interfaces.py | 79 +++++++++++++++---- 3 files changed, 91 insertions(+), 19 deletions(-) diff --git a/dlt/dataset.py b/dlt/dataset.py index 475dbdbaed..569422a47f 100644 --- a/dlt/dataset.py +++ b/dlt/dataset.py @@ -37,7 +37,7 @@ def _client(self) -> Generator[SupportsDataAccess, None, None]: yield client return - raise Exception("Destination does not support data access") + raise Exception("Destination does not support data access.") def df(self, *, sql: str = None, table: str = None, batch_size: int = 1000) -> DataFrame: """Get first batch of table as dataframe""" diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index b1b747ec93..d4d4c52f89 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -561,8 +561,33 @@ def get_table_jobs( def iter_df( self, *, sql: str = None, table: str = None, batch_size: int = 1000 - ) -> Generator[DataFrame, None, None]: ... + ) -> Generator[DataFrame, None, None]: + """Provide dataframes via duckdb""" + import duckdb + + duckdb.register_filesystem(self.fs_client) + + # create in memory table, for now we read all available files + db = duckdb.connect(":memory:") + files = self.list_table_files(table) + protocol = "" if self.is_local_filesystem else f"{self.config.protocol}://" + files_string = ",".join([f"'{protocol}{f}'" for f in files]) + db.sql(f"CREATE TABLE {table} AS SELECT * FROM read_json([{files_string}]);") + + # yield in batches + offset = 0 + while True: + df = db.sql(f"SELECT * FROM {table} OFFSET {offset} LIMIT {batch_size}").df() + if len(df.index) == 0: + break + yield df + offset += batch_size def iter_arrow( self, *, sql: str = None, table: str = None, batch_size: int = 1000 - ) -> Generator[ArrowTable, None, None]: ... + ) -> Generator[ArrowTable, None, None]: + """Default implementation converts df to arrow""" + + # TODO: duckdb supports iterating in batches natively.. + for df in self.iter_df(sql=sql, table=table, batch_size=batch_size): + yield ArrowTable.from_pandas(df) diff --git a/tests/load/test_read_interfaces.py b/tests/load/test_read_interfaces.py index 36a9700ed8..1309de1933 100644 --- a/tests/load/test_read_interfaces.py +++ b/tests/load/test_read_interfaces.py @@ -1,5 +1,6 @@ import pytest import dlt +import os from typing import List from functools import reduce @@ -8,35 +9,81 @@ from pandas import DataFrame +@dlt.source() +def source(): + @dlt.resource() + def items(): + yield from [{"id": i, "children": [{"id": i + 100}, {"id": i + 1000}]} for i in range(300)] + + @dlt.resource() + def items2(): + yield from [{"id": i, "children": [{"id": i + 100}, {"id": i + 1000}]} for i in range(150)] + + return [items, items2] + + @pytest.mark.essential @pytest.mark.parametrize( "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name, ) -def test_read_interfaces(destination_config: DestinationTestConfiguration) -> None: - # we load a table with child table and check wether ibis works +def test_read_interfaces_sql(destination_config: DestinationTestConfiguration) -> None: pipeline = destination_config.setup_pipeline( "read_pipeline", dataset_name="read_test", dev_mode=True ) - @dlt.source() - def source(): - @dlt.resource() - def items(): - yield from [ - {"id": i, "children": [{"id": i + 100}, {"id": i + 1000}]} for i in range(300) - ] + # run source + s = source() + pipeline.run( + s, + ) + + # get one df + df = pipeline.dataset.df(table="items", batch_size=5) + assert len(df.index) == 5 + assert set(df.columns.values) == {"id", "_dlt_load_id", "_dlt_id"} + + # iterate all dataframes + frames = [] + for df in pipeline.dataset.iter_df(table="items", batch_size=70): + frames.append(df) + + # check frame amount and items counts + assert len(frames) == 5 + assert [len(df.index) for df in frames] == [70, 70, 70, 70, 20] + + # check all items are present + ids = reduce(lambda a, b: a + b, [f["id"].to_list() for f in frames]) + assert set(ids) == set(range(300)) + + # basic check of arrow table + table = pipeline.dataset.arrow(table="items", batch_size=5) + assert set(table.column_names) == {"id", "_dlt_load_id", "_dlt_id"} + table.num_rows == 5 + + # access via resource + len(s.items.dataset.df().index) == 300 + len(s.items2.dataset.df().index) == 150 + - @dlt.resource() - def items2(): - yield from [ - {"id": i, "children": [{"id": i + 100}, {"id": i + 1000}]} for i in range(150) - ] +@pytest.mark.essential +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + local_filesystem_configs=True, all_buckets_filesystem_configs=True + ), # TODO: test all buckets + ids=lambda x: x.name, +) +def test_read_interfaces_filesystem(destination_config: DestinationTestConfiguration) -> None: + # we force multiple files per table, they may only hold 50 items + os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "50" - return [items, items2] + pipeline = destination_config.setup_pipeline( + "read_pipeline", dataset_name="read_test", dev_mode=True + ) - # create 300 entries in "items" table + # run source s = source() pipeline.run( s,