Skip to content

Commit

Permalink
clones schema without data tables when resources without source are e…
Browse files Browse the repository at this point in the history
…xtacted, adds tests
  • Loading branch information
rudolfix committed Dec 11, 2024
1 parent 7e8667e commit 7f137f1
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 25 deletions.
7 changes: 6 additions & 1 deletion dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ def choose_schema() -> Schema:
schema_ = schema
# take pipeline schema to make newest version visible to the resources
elif pipeline.default_schema_name:
schema_ = pipeline.schemas[pipeline.default_schema_name].clone()
# clones with name which will drop previous hashes
schema_ = pipeline.schemas[pipeline.default_schema_name].clone(
with_name=pipeline.default_schema_name
)
# delete data tables
schema_.drop_tables(schema_.data_table_names(include_incomplete=True))
else:
schema_ = pipeline._make_schema_with_default_name()
return schema_
Expand Down
55 changes: 45 additions & 10 deletions tests/load/pipeline/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ def _attach(pipeline: Pipeline) -> Pipeline:


@dlt.source(section="droppable", name="droppable")
def droppable_source() -> List[DltResource]:
def droppable_source(drop_columns: bool = False) -> List[DltResource]:
@dlt.resource
def droppable_a(
a: dlt.sources.incremental[int] = dlt.sources.incremental("a", 0)
a: dlt.sources.incremental[int] = dlt.sources.incremental("a", 0, range_start="open")
) -> Iterator[Dict[str, Any]]:
yield dict(a=1, b=2, c=3)
yield dict(a=4, b=23, c=24)
if drop_columns:
yield dict(a=1, b=2)
yield dict(a=4, b=23)
else:
yield dict(a=1, b=2, c=3)
yield dict(a=4, b=23, c=24)

@dlt.resource
def droppable_b(
Expand All @@ -47,9 +51,17 @@ def droppable_c(
qe: dlt.sources.incremental[int] = dlt.sources.incremental("qe"),
) -> Iterator[Dict[str, Any]]:
# Grandchild table
yield dict(
asdasd=2424, qe=111, items=[dict(k=2, r=2, labels=[dict(name="abc"), dict(name="www")])]
)
if drop_columns:
# dropped asdasd, items[r], items.labels.value
yield dict(qe=111, items=[dict(k=2, labels=[dict(name="abc"), dict(name="www")])])
else:
yield dict(
asdasd=2424,
qe=111,
items=[
dict(k=2, r=2, labels=[dict(name="abc", value=1), dict(name="www", value=2)])
],
)

@dlt.resource
def droppable_d(
Expand Down Expand Up @@ -134,11 +146,17 @@ def assert_destination_state_loaded(pipeline: Pipeline) -> None:
),
ids=lambda x: x.name,
)
def test_drop_command_resources_and_state(destination_config: DestinationTestConfiguration) -> None:
@pytest.mark.parametrize("in_source", (True, False))
def test_drop_command_resources_and_state(
destination_config: DestinationTestConfiguration, in_source: bool
) -> None:
"""Test the drop command with resource and state path options and
verify correct data is deleted from destination and locally"""
source = droppable_source()
pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), dev_mode=True)
source: Any = droppable_source()
if not in_source:
source = list(source.selected_resources.values())

pipeline = destination_config.setup_pipeline("droppable", dev_mode=True)
info = pipeline.run(source, **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(pipeline, *pipeline.default_schema.tables.keys()) == {
Expand Down Expand Up @@ -173,6 +191,9 @@ def test_drop_command_resources_and_state(destination_config: DestinationTestCon
assert_destination_state_loaded(pipeline)

# now run the same droppable_source to see if tables are recreated and they contain right number of items
source = droppable_source(drop_columns=True)
if not in_source:
source = list(source.selected_resources.values())
info = pipeline.run(source, **destination_config.run_kwargs)
assert_load_info(info)
# 2 versions (one dropped and replaced with schema with dropped tables, then we added missing tables)
Expand All @@ -192,6 +213,20 @@ def test_drop_command_resources_and_state(destination_config: DestinationTestCon
"droppable_c__items": 1,
"droppable_c__items__labels": 2,
}
# check if columns got correctly dropped
droppable_a_schema = pipeline.default_schema.get_table("droppable_a")
# this table was not dropped so column still exists
assert "c" in droppable_a_schema["columns"]
# dropped asdasd, items[r], items.labels.value
droppable_c_schema = pipeline.default_schema.get_table("droppable_c")
assert "asdasd" not in droppable_c_schema["columns"]
assert "qe" in droppable_c_schema["columns"]
droppable_c_i_schema = pipeline.default_schema.get_table("droppable_c__items")
assert "r" not in droppable_c_i_schema["columns"]
assert "k" in droppable_c_i_schema["columns"]
droppable_c_l_schema = pipeline.default_schema.get_table("droppable_c__items__labels")
assert "value" not in droppable_c_l_schema["columns"]
assert "name" in droppable_c_l_schema["columns"]


@pytest.mark.parametrize(
Expand Down
72 changes: 58 additions & 14 deletions tests/load/pipeline/test_refresh_modes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Any, List

import os
import pytest
import dlt
from dlt.common.destination.exceptions import DestinationUndefinedEntity
Expand All @@ -12,7 +12,7 @@
from dlt.extract.source import DltSource
from dlt.pipeline.state_sync import load_pipeline_state_from_destination

from tests.utils import clean_test_storage
from tests.utils import clean_test_storage, TEST_STORAGE_ROOT
from tests.pipeline.utils import (
_is_filesystem,
assert_load_info,
Expand Down Expand Up @@ -106,19 +106,42 @@ def some_data_4():
),
ids=lambda x: x.name,
)
def test_refresh_drop_sources(destination_config: DestinationTestConfiguration):
pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_sources")
@pytest.mark.parametrize("in_source", (True, False))
@pytest.mark.parametrize("with_wipe", (True, False))
def test_refresh_drop_sources(
destination_config: DestinationTestConfiguration, in_source: bool, with_wipe: bool
):
# do not place duckdb in the working dir, because we may wipe it
os.environ["DESTINATION__DUCKDB__CREDENTIALS"] = os.path.join(
TEST_STORAGE_ROOT, "refresh_source_db.duckdb"
)

# First run pipeline so destination so tables are created
info = pipeline.run(
refresh_source(first_run=True, drop_sources=True), **destination_config.run_kwargs
pipeline = destination_config.setup_pipeline("refresh_source")

data: Any = refresh_source(first_run=True, drop_sources=True).with_resources(
"some_data_1", "some_data_2"
)
if not in_source:
data = list(data.selected_resources.values())

# First run pipeline so destination so tables are created
info = pipeline.run(data, refresh="drop_sources", **destination_config.run_kwargs)
assert_load_info(info)

# Second run of pipeline with only selected resources
if with_wipe:
pipeline._wipe_working_folder()
pipeline = destination_config.setup_pipeline("refresh_source")

data = refresh_source(first_run=False, drop_sources=True).with_resources(
"some_data_1", "some_data_2"
)
if not in_source:
data = list(data.selected_resources.values())

info = pipeline.run(
refresh_source(first_run=False, drop_sources=True).with_resources(
"some_data_1", "some_data_2"
),
data,
refresh="drop_sources",
**destination_config.run_kwargs,
)

Expand Down Expand Up @@ -199,16 +222,37 @@ def test_existing_schema_hash(destination_config: DestinationTestConfiguration):
),
ids=lambda x: x.name,
)
def test_refresh_drop_resources(destination_config: DestinationTestConfiguration):
@pytest.mark.parametrize("in_source", (True, False))
@pytest.mark.parametrize("with_wipe", (True, False))
def test_refresh_drop_resources(
destination_config: DestinationTestConfiguration, in_source: bool, with_wipe: bool
):
# do not place duckdb in the working dir, because we may wipe it
os.environ["DESTINATION__DUCKDB__CREDENTIALS"] = os.path.join(
TEST_STORAGE_ROOT, "refresh_source_db.duckdb"
)
# First run pipeline with load to destination so tables are created
pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_tables")
pipeline = destination_config.setup_pipeline("refresh_source")

info = pipeline.run(refresh_source(first_run=True), **destination_config.run_kwargs)
data: Any = refresh_source(first_run=True)
if not in_source:
data = list(data.selected_resources.values())

info = pipeline.run(data, refresh="drop_resources", **destination_config.run_kwargs)
assert_load_info(info)

# Second run of pipeline with only selected resources
if with_wipe:
pipeline._wipe_working_folder()
pipeline = destination_config.setup_pipeline("refresh_source")

data = refresh_source(first_run=False).with_resources("some_data_1", "some_data_2")
if not in_source:
data = list(data.selected_resources.values())

info = pipeline.run(
refresh_source(first_run=False).with_resources("some_data_1", "some_data_2"),
data,
refresh="drop_resources",
**destination_config.run_kwargs,
)

Expand Down
24 changes: 24 additions & 0 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,30 @@ def test_drop() -> None:
pipeline.run([1, 2, 3], table_name="numbers")


def test_source_schema_in_resource() -> None:
run_count = 0

@dlt.resource
def schema_inspector():
schema = dlt.current.source_schema()
if run_count == 0:
assert "schema_inspector" not in schema.tables
if run_count == 1:
assert "schema_inspector" in schema.tables
assert schema.tables["schema_inspector"]["columns"]["value"]["x-custom"] == "X" # type: ignore[typeddict-item]

yield [1, 2, 3]

pipeline = dlt.pipeline(pipeline_name="test_inspector", destination="duckdb")
pipeline.run(schema_inspector())

# add custom annotation
pipeline.default_schema.tables["schema_inspector"]["columns"]["value"]["x-custom"] = "X" # type: ignore[typeddict-unknown-key]

run_count += 1
pipeline.run(schema_inspector())


def test_schema_version_increase_and_source_update() -> None:
now = pendulum.now()

Expand Down

0 comments on commit 7f137f1

Please sign in to comment.