Skip to content

Commit

Permalink
add nesting level setting to custom destination
Browse files Browse the repository at this point in the history
update readme
  • Loading branch information
sh-rp committed Mar 18, 2024
1 parent fc15c94 commit 890c0e0
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 4 deletions.
1 change: 1 addition & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
insert_values_writer_type: str = "default"
supports_multiple_statements: bool = True
supports_clone_table: bool = False
max_table_nesting: Optional[int] = None # destination can overwrite max table nesting
"""Destination supports CREATE TABLE ... CLONE ... statements"""

# do not allow to create default value, destination caps must be always explicitly inserted into container
Expand Down
3 changes: 3 additions & 0 deletions dlt/destinations/impl/destination/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from typing import Optional
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.data_writers import TLoaderFileFormat


def capabilities(
preferred_loader_file_format: TLoaderFileFormat = "puae-jsonl",
naming_convention: str = "direct",
max_table_nesting: Optional[int] = 0,
) -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext.generic_capabilities(preferred_loader_file_format)
caps.supported_loader_file_formats = ["puae-jsonl", "parquet"]
caps.supports_ddl_transactions = False
caps.supports_transactions = False
caps.naming_convention = naming_convention
caps.max_table_nesting = max_table_nesting
return caps
5 changes: 3 additions & 2 deletions dlt/destinations/impl/destination/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ class DestinationInfo(t.NamedTuple):
class destination(Destination[GenericDestinationClientConfiguration, "DestinationClient"]):
def capabilities(self) -> DestinationCapabilitiesContext:
return capabilities(
self.config_params.get("loader_file_format", "puae-jsonl"),
self.config_params.get("naming_convention", "direct"),
preferred_loader_file_format=self.config_params.get("loader_file_format", "puae-jsonl"),
naming_convention=self.config_params.get("naming_convention", "direct"),
max_table_nesting=self.config_params.get("max_table_nesting", None),
)

@property
Expand Down
13 changes: 12 additions & 1 deletion dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,18 @@ def normalize(
return None

# make sure destination capabilities are available
self._get_destination_capabilities()
caps = self._get_destination_capabilities()
if caps.max_table_nesting is not None:
# destination settings override normalizer settings in schema
from dlt.common.normalizers.json.relational import (
DataItemNormalizer as RelationalNormalizer,
)

RelationalNormalizer.update_normalizer_config(
self.default_schema, {"max_nesting": caps.max_table_nesting}
)
self._schema_storage.save_schema(self.default_schema)

# create default normalize config
normalize_config = NormalizeConfiguration(
workers=workers,
Expand Down
4 changes: 3 additions & 1 deletion docs/website/docs/dlt-ecosystem/destinations/destination.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ the sink from your pipeline constructor. Now you can run your pipeline and see t
The full signature of the destination decorator plus its function is the following:

```py
@dlt.destination(batch_size=10, loader_file_format="jsonl", name="my_sink", naming="direct")
@dlt.destination(batch_size=10, loader_file_format="jsonl", name="my_sink", naming_convention="direct", max_nesting_level=0, skip_dlt_columns_and_tables=True)
def sink(items: TDataItems, table: TTableSchema) -> None:
...
```
Expand All @@ -82,6 +82,8 @@ in any way you like.
this can be `jsonl` or `parquet`.
* The `name` parameter on the destination decorator defines the name of the destination that get's created by the destination decorator.
* The `naming_convention` parameter on the destination decorator defines the name of the destination that gets created by the destination decorator. This controls
* The `max_nesting_level` parameter on the destination decorator defines how deep the normalizer will go to normalize complex fields on your data to create subtables. This overwrites any settings on your `source` and is set to zero to not create any nested tables by default.
* The `skip_dlt_columns_and_tables` parameter on the destination decorator defines wether internal tables and columns will be fed into the custom destination function. This is set to False by default.
how table and column names are normalized. The default is `direct` which will keep all names the same.

#### Sink function
Expand Down
35 changes: 35 additions & 0 deletions tests/destinations/test_custom_destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,3 +473,38 @@ def test_sink(items, table):
assert found_dlt_column != remove_stuff
assert found_dlt_table != remove_stuff
assert found_dlt_column_value != remove_stuff


@pytest.mark.parametrize("nesting", [None, 0, 1, 3])
def test_max_nesting_level(nesting: int) -> None:
# 4 nesting levels
data = [
{
"level": 1,
"children": [{"level": 2, "children": [{"level": 3, "children": [{"level": 4}]}]}],
}
]

found_tables = set()

@dlt.destination(loader_file_format="puae-jsonl", max_table_nesting=nesting)
def test_sink(items, table):
nonlocal found_tables
found_tables.add(table["name"])

@dlt.source(max_table_nesting=2)
def source():
yield dlt.resource(data, name="data")

p = dlt.pipeline("sink_test", destination=test_sink, full_refresh=True)
p.run(source())

# fall back to source setting
if nesting is None:
assert len(found_tables) == 3
else:
# use destination setting
assert len(found_tables) == nesting + 1

for table in found_tables:
assert table.startswith("data")

0 comments on commit 890c0e0

Please sign in to comment.