Skip to content

Commit

Permalink
add direct execution from readable relation
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Dec 19, 2024
1 parent cfcb5a1 commit 3e352aa
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 22 deletions.
73 changes: 53 additions & 20 deletions dlt/destinations/transformations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
from typing import Callable, Literal, Union, Any, Generator, List, TYPE_CHECKING, Iterable
from typing import (
Callable,
Literal,
Union,
Any,
Generator,
List,
TYPE_CHECKING,
Iterable,
Optional,
Any,
)

from dataclasses import dataclass
from functools import wraps
Expand Down Expand Up @@ -52,15 +63,48 @@ def wrapper(*args: Any, **kwargs: Any) -> List[TTransformationFunc]:
return decorator


def execute_transformation(
select_clause: str,
client: Any,
table_name: Optional[str] = None,
write_disposition: Optional[str] = None,
materialization: Optional[str] = None,
) -> None:
if write_disposition == "replace":
client.execute(f"CREATE OR REPLACE {materialization} {table_name} AS {select_clause}")
elif write_disposition == "append" and materialization == "table":
try:
client.execute(f"INSERT INTO {table_name} {select_clause}")
except Exception:
client.execute(f"CREATE TABLE {table_name} AS {select_clause}")
else:
raise ValueError(
f"Write disposition {write_disposition} is not supported for "
f"materialization {materialization}"
)


def run_transformations(
dataset: SupportsReadableDataset,
transformations: Union[TTransformationFunc, List[TTransformationFunc]],
transformations: Union[
TTransformationFunc, List[TTransformationFunc], SupportsReadableRelation
],
*,
table_name: Optional[str] = None,
write_disposition: Optional[str] = None,
materialization: Optional[str] = None,
) -> None:
if not isinstance(transformations, Iterable):
transformations = [transformations]

# TODO: fix typing
with dataset.sql_client as client: # type: ignore
if isinstance(transformations, SupportsReadableRelation):
execute_transformation(
transformations.query, client, table_name, write_disposition, materialization
)
return

if not isinstance(transformations, Iterable):
transformations = [transformations]

# TODO: fix typing
for transformation in transformations:
# get transformation settings
table_name = transformation.__transformation_args__["table_name"] # type: ignore
Expand All @@ -78,17 +122,6 @@ def run_transformations(
# materialize result
select_clause = relation.query

if write_disposition == "replace":
client.execute(
f"CREATE OR REPLACE {materialization} {table_name} AS {select_clause}"
)
elif write_disposition == "append" and materialization == "table":
try:
client.execute(f"INSERT INTO {table_name} {select_clause}")
except Exception:
client.execute(f"CREATE TABLE {table_name} AS {select_clause}")
else:
raise ValueError(
f"Write disposition {write_disposition} is not supported for "
f"materialization {materialization}"
)
execute_transformation(
select_clause, client, table_name, write_disposition, materialization
)
15 changes: 13 additions & 2 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1774,8 +1774,19 @@ def dataset(
)

def transform(
self, transformations: Union[TTransformationFunc, List[TTransformationFunc]]
self,
transformations: Union[TTransformationFunc, List[TTransformationFunc]],
*,
table_name: Optional[str] = None,
write_disposition: Optional[str] = "replace",
materialization: Optional[str] = "table",
) -> None:
from dlt.destinations.transformations import run_transformations

run_transformations(self.dataset(), transformations)
run_transformations(
self.dataset(),
transformations,
table_name=table_name,
write_disposition=write_disposition,
materialization=materialization,
)
6 changes: 6 additions & 0 deletions tests/load/test_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,9 @@ def aggregate_transformation(dataset: SupportsReadableDataset) -> SupportsReadab
# check aggregated table for both fields
assert p.dataset().aggregated_items.fetchone()[0] == reduce(lambda a, b: a + b, range(10))
assert p.dataset().aggregated_items.fetchone()[1] == (reduce(lambda a, b: a + b, range(10)) * 2)

# check simple transformation function
items_table = p.dataset().items
p.transform(items_table.mutate(new_col=items_table.id), table_name="direct")

print(p.dataset().direct.df())

0 comments on commit 3e352aa

Please sign in to comment.