From 4cd3dfa1d666444466770bb36f93485dc600aeef Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sun, 1 Sep 2024 17:52:37 +0200 Subject: [PATCH] adds filesystem to drop command tests --- .../impl/filesystem/filesystem.py | 12 +++ tests/load/pipeline/test_drop.py | 88 ++++++++++++++++--- 2 files changed, 87 insertions(+), 13 deletions(-) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 1f2c5aceef..f03a1a0c27 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -11,6 +11,7 @@ from dlt.common import logger, time, json, pendulum from dlt.common.destination.utils import resolve_merge_strategy from dlt.common.metrics import LoadJobMetrics +from dlt.common.schema.typing import TTableSchemaColumns from dlt.common.storages.fsspec_filesystem import glob_files from dlt.common.typing import DictStrAny from dlt.common.schema import Schema, TSchemaTables @@ -268,6 +269,17 @@ def drop_tables(self, *tables: str, delete_schema: bool = True) -> None: if fileparts[0] == self.schema.name: self._delete_file(filename) + def get_storage_tables( + self, table_names: Iterable[str] + ) -> Iterable[Tuple[str, TTableSchemaColumns]]: + """Yields tables that have files in storage, does not return column schemas""" + for table_name in table_names: + if len(self.list_table_files(table_name)) > 0: + yield (table_name, {"_column": {}}) + else: + # if no columns we assume that table does not exist + yield (table_name, {}) + def truncate_tables(self, table_names: List[str]) -> None: """Truncate a set of regular tables with given `table_names`""" table_dirs = set(self.get_table_dirs(table_names)) diff --git a/tests/load/pipeline/test_drop.py b/tests/load/pipeline/test_drop.py index 1e4d6ec2ea..f6ddd79b99 100644 --- a/tests/load/pipeline/test_drop.py +++ b/tests/load/pipeline/test_drop.py @@ -6,6 +6,7 @@ import pytest import dlt +from dlt.common.destination.reference import JobClientBase from dlt.extract import DltResource from dlt.common.utils import uniq_id from dlt.pipeline import helpers, state_sync, Pipeline @@ -18,6 +19,7 @@ from dlt.destinations.job_client_impl import SqlJobClientBase from tests.load.utils import destinations_configs, DestinationTestConfiguration +from tests.pipeline.utils import assert_load_info, load_table_counts def _attach(pipeline: Pipeline) -> Pipeline: @@ -124,24 +126,45 @@ def assert_destination_state_loaded(pipeline: Pipeline) -> None: assert pipeline_state == destination_state +@pytest.mark.essential @pytest.mark.parametrize( - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name + "destination_config", + destinations_configs( + default_sql_configs=True, local_filesystem_configs=True, all_buckets_filesystem_configs=True + ), + ids=lambda x: x.name, ) def test_drop_command_resources_and_state(destination_config: DestinationTestConfiguration) -> None: """Test the drop command with resource and state path options and verify correct data is deleted from destination and locally""" source = droppable_source() pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True) - pipeline.run(source, **destination_config.run_kwargs) + info = pipeline.run(source, **destination_config.run_kwargs) + assert_load_info(info) + assert load_table_counts(pipeline, *pipeline.default_schema.tables.keys()) == { + "_dlt_version": 1, + "_dlt_loads": 1, + "droppable_a": 2, + "droppable_b": 1, + "droppable_c": 1, + "droppable_d": 2, + "droppable_no_state": 3, + "_dlt_pipeline_state": 1, + "droppable_b__items": 2, + "droppable_c__items": 1, + "droppable_c__items__labels": 2, + } attached = _attach(pipeline) helpers.drop( - attached, resources=["droppable_c", "droppable_d"], state_paths="data_from_d.*.bar" + attached, + resources=["droppable_c", "droppable_d", "droppable_no_state"], + state_paths="data_from_d.*.bar", ) attached = _attach(pipeline) - assert_dropped_resources(attached, ["droppable_c", "droppable_d"]) + assert_dropped_resources(attached, ["droppable_c", "droppable_d", "droppable_no_state"]) # Verify extra json paths are removed from state sources_state = pipeline.state["sources"] @@ -149,9 +172,32 @@ def test_drop_command_resources_and_state(destination_config: DestinationTestCon assert_destination_state_loaded(pipeline) + # now run the same droppable_source to see if tables are recreated and they contain right number of items + info = pipeline.run(source, **destination_config.run_kwargs) + assert_load_info(info) + # 2 versions (one dropped and replaced with schema with dropped tables, then we added missing tables) + # 3 loads (one for drop) + # droppable_no_state correctly replaced + # all other resources stay at the same count (they are incremental so they got loaded again or not loaded at all ie droppable_a) + assert load_table_counts(pipeline, *pipeline.default_schema.tables.keys()) == { + "_dlt_version": 2, + "_dlt_loads": 3, + "droppable_a": 2, + "droppable_b": 1, + "_dlt_pipeline_state": 3, + "droppable_b__items": 2, + "droppable_c": 1, + "droppable_d": 2, + "droppable_no_state": 3, + "droppable_c__items": 1, + "droppable_c__items__labels": 2, + } + @pytest.mark.parametrize( - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name + "destination_config", + destinations_configs(default_sql_configs=True, local_filesystem_configs=True), + ids=lambda x: x.name, ) def test_drop_command_only_state(destination_config: DestinationTestConfiguration) -> None: """Test drop command that deletes part of the state and syncs with destination""" @@ -174,7 +220,9 @@ def test_drop_command_only_state(destination_config: DestinationTestConfiguratio @pytest.mark.parametrize( - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name + "destination_config", + destinations_configs(default_sql_configs=True, local_filesystem_configs=True), + ids=lambda x: x.name, ) def test_drop_command_only_tables(destination_config: DestinationTestConfiguration) -> None: """Test drop only tables and makes sure that schema and state are synced""" @@ -196,7 +244,9 @@ def test_drop_command_only_tables(destination_config: DestinationTestConfigurati @pytest.mark.parametrize( - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name + "destination_config", + destinations_configs(default_sql_configs=True, local_filesystem_configs=True), + ids=lambda x: x.name, ) def test_drop_destination_tables_fails(destination_config: DestinationTestConfiguration) -> None: """Fail on DROP TABLES in destination init. Command runs again.""" @@ -224,7 +274,9 @@ def test_drop_destination_tables_fails(destination_config: DestinationTestConfig @pytest.mark.parametrize( - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name + "destination_config", + destinations_configs(default_sql_configs=True, local_filesystem_configs=True), + ids=lambda x: x.name, ) def test_fail_after_drop_tables(destination_config: DestinationTestConfiguration) -> None: """Fail directly after drop tables. Command runs again ignoring destination tables missing.""" @@ -255,7 +307,9 @@ def test_fail_after_drop_tables(destination_config: DestinationTestConfiguration @pytest.mark.parametrize( - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name + "destination_config", + destinations_configs(default_sql_configs=True, local_filesystem_configs=True), + ids=lambda x: x.name, ) def test_load_step_fails(destination_config: DestinationTestConfiguration) -> None: """Test idempotence. pipeline.load() fails. Command can be run again successfully""" @@ -278,7 +332,9 @@ def test_load_step_fails(destination_config: DestinationTestConfiguration) -> No @pytest.mark.parametrize( - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name + "destination_config", + destinations_configs(default_sql_configs=True, local_filesystem_configs=True), + ids=lambda x: x.name, ) def test_resource_regex(destination_config: DestinationTestConfiguration) -> None: source = droppable_source() @@ -296,7 +352,9 @@ def test_resource_regex(destination_config: DestinationTestConfiguration) -> Non @pytest.mark.parametrize( - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name + "destination_config", + destinations_configs(default_sql_configs=True, local_filesystem_configs=True), + ids=lambda x: x.name, ) def test_drop_nothing(destination_config: DestinationTestConfiguration) -> None: """No resources, no state keys. Nothing is changed.""" @@ -340,7 +398,9 @@ def test_drop_all_flag(destination_config: DestinationTestConfiguration) -> None @pytest.mark.parametrize( - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name + "destination_config", + destinations_configs(default_sql_configs=True, local_filesystem_configs=True), + ids=lambda x: x.name, ) def test_run_pipeline_after_partial_drop(destination_config: DestinationTestConfiguration) -> None: """Pipeline can be run again after dropping some resources""" @@ -359,7 +419,9 @@ def test_run_pipeline_after_partial_drop(destination_config: DestinationTestConf @pytest.mark.parametrize( - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name + "destination_config", + destinations_configs(default_sql_configs=True, local_filesystem_configs=True), + ids=lambda x: x.name, ) def test_drop_state_only(destination_config: DestinationTestConfiguration) -> None: """Pipeline can be run again after dropping some resources"""