diff --git a/dlt/common/destination/capabilities.py b/dlt/common/destination/capabilities.py index 0f2500c2cd..36a9cc3b6e 100644 --- a/dlt/common/destination/capabilities.py +++ b/dlt/common/destination/capabilities.py @@ -55,6 +55,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext): insert_values_writer_type: str = "default" supports_multiple_statements: bool = True supports_clone_table: bool = False + max_table_nesting: Optional[int] = None # destination can overwrite max table nesting """Destination supports CREATE TABLE ... CLONE ... statements""" # do not allow to create default value, destination caps must be always explicitly inserted into container diff --git a/dlt/destinations/impl/destination/__init__.py b/dlt/destinations/impl/destination/__init__.py index fbad2d570f..560c9d4eda 100644 --- a/dlt/destinations/impl/destination/__init__.py +++ b/dlt/destinations/impl/destination/__init__.py @@ -1,3 +1,4 @@ +from typing import Optional from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.data_writers import TLoaderFileFormat @@ -5,10 +6,12 @@ def capabilities( preferred_loader_file_format: TLoaderFileFormat = "puae-jsonl", naming_convention: str = "direct", + max_table_nesting: Optional[int] = 0, ) -> DestinationCapabilitiesContext: caps = DestinationCapabilitiesContext.generic_capabilities(preferred_loader_file_format) caps.supported_loader_file_formats = ["puae-jsonl", "parquet"] caps.supports_ddl_transactions = False caps.supports_transactions = False caps.naming_convention = naming_convention + caps.max_table_nesting = max_table_nesting return caps diff --git a/dlt/destinations/impl/destination/factory.py b/dlt/destinations/impl/destination/factory.py index 992d78795b..1195daf972 100644 --- a/dlt/destinations/impl/destination/factory.py +++ b/dlt/destinations/impl/destination/factory.py @@ -36,8 +36,9 @@ class DestinationInfo(t.NamedTuple): class destination(Destination[GenericDestinationClientConfiguration, "DestinationClient"]): def capabilities(self) -> DestinationCapabilitiesContext: return capabilities( - self.config_params.get("loader_file_format", "puae-jsonl"), - self.config_params.get("naming_convention", "direct"), + preferred_loader_file_format=self.config_params.get("loader_file_format", "puae-jsonl"), + naming_convention=self.config_params.get("naming_convention", "direct"), + max_table_nesting=self.config_params.get("max_table_nesting", None), ) @property diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 042a62e8fb..4469ad2338 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -456,7 +456,18 @@ def normalize( return None # make sure destination capabilities are available - self._get_destination_capabilities() + caps = self._get_destination_capabilities() + if caps.max_table_nesting is not None: + # destination settings override normalizer settings in schema + from dlt.common.normalizers.json.relational import ( + DataItemNormalizer as RelationalNormalizer, + ) + + RelationalNormalizer.update_normalizer_config( + self.default_schema, {"max_nesting": caps.max_table_nesting} + ) + self._schema_storage.save_schema(self.default_schema) + # create default normalize config normalize_config = NormalizeConfiguration( workers=workers, diff --git a/docs/website/docs/dlt-ecosystem/destinations/destination.md b/docs/website/docs/dlt-ecosystem/destinations/destination.md index 174eaa7837..5ccc7fe436 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/destination.md +++ b/docs/website/docs/dlt-ecosystem/destinations/destination.md @@ -69,7 +69,7 @@ the sink from your pipeline constructor. Now you can run your pipeline and see t The full signature of the destination decorator plus its function is the following: ```py -@dlt.destination(batch_size=10, loader_file_format="jsonl", name="my_sink", naming="direct") +@dlt.destination(batch_size=10, loader_file_format="jsonl", name="my_sink", naming_convention="direct", max_nesting_level=0, skip_dlt_columns_and_tables=True) def sink(items: TDataItems, table: TTableSchema) -> None: ... ``` @@ -82,6 +82,8 @@ in any way you like. this can be `jsonl` or `parquet`. * The `name` parameter on the destination decorator defines the name of the destination that get's created by the destination decorator. * The `naming_convention` parameter on the destination decorator defines the name of the destination that gets created by the destination decorator. This controls +* The `max_nesting_level` parameter on the destination decorator defines how deep the normalizer will go to normalize complex fields on your data to create subtables. This overwrites any settings on your `source` and is set to zero to not create any nested tables by default. +* The `skip_dlt_columns_and_tables` parameter on the destination decorator defines wether internal tables and columns will be fed into the custom destination function. This is set to False by default. how table and column names are normalized. The default is `direct` which will keep all names the same. #### Sink function diff --git a/tests/destinations/test_custom_destination.py b/tests/destinations/test_custom_destination.py index cc6396c4ee..be29bc3008 100644 --- a/tests/destinations/test_custom_destination.py +++ b/tests/destinations/test_custom_destination.py @@ -473,3 +473,38 @@ def test_sink(items, table): assert found_dlt_column != remove_stuff assert found_dlt_table != remove_stuff assert found_dlt_column_value != remove_stuff + + +@pytest.mark.parametrize("nesting", [None, 0, 1, 3]) +def test_max_nesting_level(nesting: int) -> None: + # 4 nesting levels + data = [ + { + "level": 1, + "children": [{"level": 2, "children": [{"level": 3, "children": [{"level": 4}]}]}], + } + ] + + found_tables = set() + + @dlt.destination(loader_file_format="puae-jsonl", max_table_nesting=nesting) + def test_sink(items, table): + nonlocal found_tables + found_tables.add(table["name"]) + + @dlt.source(max_table_nesting=2) + def source(): + yield dlt.resource(data, name="data") + + p = dlt.pipeline("sink_test", destination=test_sink, full_refresh=True) + p.run(source()) + + # fall back to source setting + if nesting is None: + assert len(found_tables) == 3 + else: + # use destination setting + assert len(found_tables) == nesting + 1 + + for table in found_tables: + assert table.startswith("data")