Skip to content

Commit

Permalink
adds filesystem to drop command tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Sep 1, 2024
1 parent 4a974b4 commit 4cd3dfa
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 13 deletions.
12 changes: 12 additions & 0 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dlt.common import logger, time, json, pendulum
from dlt.common.destination.utils import resolve_merge_strategy
from dlt.common.metrics import LoadJobMetrics
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.storages.fsspec_filesystem import glob_files
from dlt.common.typing import DictStrAny
from dlt.common.schema import Schema, TSchemaTables
Expand Down Expand Up @@ -268,6 +269,17 @@ def drop_tables(self, *tables: str, delete_schema: bool = True) -> None:
if fileparts[0] == self.schema.name:
self._delete_file(filename)

def get_storage_tables(
self, table_names: Iterable[str]
) -> Iterable[Tuple[str, TTableSchemaColumns]]:
"""Yields tables that have files in storage, does not return column schemas"""
for table_name in table_names:
if len(self.list_table_files(table_name)) > 0:
yield (table_name, {"_column": {}})
else:
# if no columns we assume that table does not exist
yield (table_name, {})

def truncate_tables(self, table_names: List[str]) -> None:
"""Truncate a set of regular tables with given `table_names`"""
table_dirs = set(self.get_table_dirs(table_names))
Expand Down
88 changes: 75 additions & 13 deletions tests/load/pipeline/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest

import dlt
from dlt.common.destination.reference import JobClientBase
from dlt.extract import DltResource
from dlt.common.utils import uniq_id
from dlt.pipeline import helpers, state_sync, Pipeline
Expand All @@ -18,6 +19,7 @@
from dlt.destinations.job_client_impl import SqlJobClientBase

from tests.load.utils import destinations_configs, DestinationTestConfiguration
from tests.pipeline.utils import assert_load_info, load_table_counts


def _attach(pipeline: Pipeline) -> Pipeline:
Expand Down Expand Up @@ -124,34 +126,78 @@ def assert_destination_state_loaded(pipeline: Pipeline) -> None:
assert pipeline_state == destination_state


@pytest.mark.essential
@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
"destination_config",
destinations_configs(
default_sql_configs=True, local_filesystem_configs=True, all_buckets_filesystem_configs=True
),
ids=lambda x: x.name,
)
def test_drop_command_resources_and_state(destination_config: DestinationTestConfiguration) -> None:
"""Test the drop command with resource and state path options and
verify correct data is deleted from destination and locally"""
source = droppable_source()
pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True)
pipeline.run(source, **destination_config.run_kwargs)
info = pipeline.run(source, **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(pipeline, *pipeline.default_schema.tables.keys()) == {
"_dlt_version": 1,
"_dlt_loads": 1,
"droppable_a": 2,
"droppable_b": 1,
"droppable_c": 1,
"droppable_d": 2,
"droppable_no_state": 3,
"_dlt_pipeline_state": 1,
"droppable_b__items": 2,
"droppable_c__items": 1,
"droppable_c__items__labels": 2,
}

attached = _attach(pipeline)
helpers.drop(
attached, resources=["droppable_c", "droppable_d"], state_paths="data_from_d.*.bar"
attached,
resources=["droppable_c", "droppable_d", "droppable_no_state"],
state_paths="data_from_d.*.bar",
)

attached = _attach(pipeline)

assert_dropped_resources(attached, ["droppable_c", "droppable_d"])
assert_dropped_resources(attached, ["droppable_c", "droppable_d", "droppable_no_state"])

# Verify extra json paths are removed from state
sources_state = pipeline.state["sources"]
assert sources_state["droppable"]["data_from_d"] == {"foo1": {}, "foo2": {}}

assert_destination_state_loaded(pipeline)

# now run the same droppable_source to see if tables are recreated and they contain right number of items
info = pipeline.run(source, **destination_config.run_kwargs)
assert_load_info(info)
# 2 versions (one dropped and replaced with schema with dropped tables, then we added missing tables)
# 3 loads (one for drop)
# droppable_no_state correctly replaced
# all other resources stay at the same count (they are incremental so they got loaded again or not loaded at all ie droppable_a)
assert load_table_counts(pipeline, *pipeline.default_schema.tables.keys()) == {
"_dlt_version": 2,
"_dlt_loads": 3,
"droppable_a": 2,
"droppable_b": 1,
"_dlt_pipeline_state": 3,
"droppable_b__items": 2,
"droppable_c": 1,
"droppable_d": 2,
"droppable_no_state": 3,
"droppable_c__items": 1,
"droppable_c__items__labels": 2,
}


@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
"destination_config",
destinations_configs(default_sql_configs=True, local_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_drop_command_only_state(destination_config: DestinationTestConfiguration) -> None:
"""Test drop command that deletes part of the state and syncs with destination"""
Expand All @@ -174,7 +220,9 @@ def test_drop_command_only_state(destination_config: DestinationTestConfiguratio


@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
"destination_config",
destinations_configs(default_sql_configs=True, local_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_drop_command_only_tables(destination_config: DestinationTestConfiguration) -> None:
"""Test drop only tables and makes sure that schema and state are synced"""
Expand All @@ -196,7 +244,9 @@ def test_drop_command_only_tables(destination_config: DestinationTestConfigurati


@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
"destination_config",
destinations_configs(default_sql_configs=True, local_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_drop_destination_tables_fails(destination_config: DestinationTestConfiguration) -> None:
"""Fail on DROP TABLES in destination init. Command runs again."""
Expand Down Expand Up @@ -224,7 +274,9 @@ def test_drop_destination_tables_fails(destination_config: DestinationTestConfig


@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
"destination_config",
destinations_configs(default_sql_configs=True, local_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_fail_after_drop_tables(destination_config: DestinationTestConfiguration) -> None:
"""Fail directly after drop tables. Command runs again ignoring destination tables missing."""
Expand Down Expand Up @@ -255,7 +307,9 @@ def test_fail_after_drop_tables(destination_config: DestinationTestConfiguration


@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
"destination_config",
destinations_configs(default_sql_configs=True, local_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_load_step_fails(destination_config: DestinationTestConfiguration) -> None:
"""Test idempotence. pipeline.load() fails. Command can be run again successfully"""
Expand All @@ -278,7 +332,9 @@ def test_load_step_fails(destination_config: DestinationTestConfiguration) -> No


@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
"destination_config",
destinations_configs(default_sql_configs=True, local_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_resource_regex(destination_config: DestinationTestConfiguration) -> None:
source = droppable_source()
Expand All @@ -296,7 +352,9 @@ def test_resource_regex(destination_config: DestinationTestConfiguration) -> Non


@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
"destination_config",
destinations_configs(default_sql_configs=True, local_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_drop_nothing(destination_config: DestinationTestConfiguration) -> None:
"""No resources, no state keys. Nothing is changed."""
Expand Down Expand Up @@ -340,7 +398,9 @@ def test_drop_all_flag(destination_config: DestinationTestConfiguration) -> None


@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
"destination_config",
destinations_configs(default_sql_configs=True, local_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_run_pipeline_after_partial_drop(destination_config: DestinationTestConfiguration) -> None:
"""Pipeline can be run again after dropping some resources"""
Expand All @@ -359,7 +419,9 @@ def test_run_pipeline_after_partial_drop(destination_config: DestinationTestConf


@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
"destination_config",
destinations_configs(default_sql_configs=True, local_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_drop_state_only(destination_config: DestinationTestConfiguration) -> None:
"""Pipeline can be run again after dropping some resources"""
Expand Down

0 comments on commit 4cd3dfa

Please sign in to comment.