Skip to content

Commit

Permalink
deprecates force_icebergs, adds hive table format to opt out
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Sep 3, 2024
1 parent 8fb171a commit baa6f9b
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 32 deletions.
3 changes: 0 additions & 3 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1126,9 +1126,6 @@ def _renormalize_schema_identifiers(

def _configure_normalizers(self, explicit_normalizers: TNormalizersConfig) -> None:
"""Gets naming and item normalizer from schema yaml, config providers and destination capabilities and applies them to schema."""
# import desired modules
from dlt.common.schema.normalizers import import_normalizers

normalizers_config, to_naming, item_normalizer_class = import_normalizers(
explicit_normalizers, self._normalizers_config
)
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
]
"""Known hints of a column used to declare hint regexes."""

TTableFormat = Literal["iceberg", "delta"]
TTableFormat = Literal["iceberg", "delta", "hive"]
TFileFormat = Literal[Literal["preferred"], TLoaderFileFormat]
TTypeDetections = Literal[
"timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double"
Expand Down
12 changes: 11 additions & 1 deletion dlt/common/warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def __init__(
if isinstance(expected_due, semver.VersionInfo)
else semver.parse_version_info(expected_due)
)
self.expected_due = expected_due if expected_due is not None else self.since.bump_minor()
# we deprecate across major version since 1.0.0
self.expected_due = expected_due if expected_due is not None else self.since.bump_major()

def __str__(self) -> str:
message = (
Expand All @@ -57,6 +58,15 @@ def __init__(self, message: str, *args: typing.Any, expected_due: VersionString
)


class Dlt100DeprecationWarning(DltDeprecationWarning):
V100 = semver.parse_version_info("1.0.0")

def __init__(self, message: str, *args: typing.Any, expected_due: VersionString = None) -> None:
super().__init__(
message, *args, since=Dlt100DeprecationWarning.V100, expected_due=expected_due
)


# show dlt deprecations once
warnings.simplefilter("once", DltDeprecationWarning)

Expand Down
5 changes: 4 additions & 1 deletion dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,10 @@ def _is_iceberg_table(
table_format = table.get("table_format")
# all dlt tables that are not loaded via files are iceberg tables, no matter if they are on staging or regular dataset
# all other iceberg tables are HIVE (external) tables on staging dataset
return (table_format == "iceberg" and not is_staging_dataset) or table[
table_format_iceberg = table_format == "iceberg" or (
self.config.force_iceberg and table_format is None
)
return (table_format_iceberg and not is_staging_dataset) or table[
"write_disposition"
] == "skip"

Expand Down
15 changes: 10 additions & 5 deletions dlt/destinations/impl/athena/configuration.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import dataclasses
from typing import ClassVar, Final, List, Optional
import warnings

from dlt.common import logger
from dlt.common.configuration import configspec
from dlt.common.destination.reference import DestinationClientDwhWithStagingConfiguration
from dlt.common.configuration.specs import AwsCredentials
from dlt.common.warnings import Dlt100DeprecationWarning


@configspec
Expand All @@ -14,21 +16,24 @@ class AthenaClientConfiguration(DestinationClientDwhWithStagingConfiguration):
credentials: AwsCredentials = None
athena_work_group: Optional[str] = None
aws_data_catalog: Optional[str] = "awsdatacatalog"
supports_truncate_command: bool = False
force_iceberg: Optional[bool] = None

__config_gen_annotations__: ClassVar[List[str]] = ["athena_work_group"]

def on_resolved(self) -> None:
if self.force_iceberg is not None:
logger.warning(
"force_iceberg flag is no longer supported. please set table format explicitly on"
" the resources"
warnings.warn(
"The `force_iceberg` is deprecated.If you upgraded dlt on existing pipeline and you"
" have data already loaded, please keep this flag to make sure your data is"
" consistent.If you are creating a new dataset and no data was loaded, please set"
" `table_format='iceberg`` on your resources explicitly.",
Dlt100DeprecationWarning,
stacklevel=1,
)

def __str__(self) -> str:
"""Return displayable destination location"""
if self.staging_config:
return str(self.staging_config.credentials)
return f"{self.staging_config} on {self.aws_data_catalog}"
else:
return "[no staging set]"
2 changes: 1 addition & 1 deletion dlt/destinations/impl/athena/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
# athena only supports loading from staged files on s3 for now
caps.preferred_loader_file_format = None
caps.supported_loader_file_formats = []
caps.supported_table_formats = ["iceberg"]
caps.supported_table_formats = ["iceberg", "hive"]
caps.preferred_staging_file_format = "parquet"
caps.supported_staging_file_formats = ["parquet", "jsonl"]
# athena is storing all identifiers in lower case and is case insensitive
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/databricks/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supported_loader_file_formats = []
caps.preferred_staging_file_format = "parquet"
caps.supported_staging_file_formats = ["jsonl", "parquet"]
caps.supported_table_formats = ["delta"]
caps.escape_identifier = escape_databricks_identifier
# databricks identifiers are case insensitive and stored in lower case
# https://docs.databricks.com/en/sql/language-manual/sql-ref-identifiers.html
Expand Down
18 changes: 1 addition & 17 deletions dlt/normalize/validate.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,11 @@
from ast import List
from typing import Optional
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.destination.utils import resolve_merge_strategy
from dlt.common.schema import Schema
from dlt.common.schema.typing import TTableSchema
from dlt.common.schema.utils import find_incomplete_columns, get_validity_column_names
from dlt.common.schema.utils import find_incomplete_columns
from dlt.common.schema.exceptions import UnboundColumnException
from dlt.common import logger


# def validate_validity_column_names(
# schema_name: str, validity_column_names: List[Optional[str]]
# ) -> None:
# """Raises exception if configured validity column name appears in data item."""
# for validity_column_name in validity_column_names:
# if validity_column_name in item.keys():
# raise ColumnNameConflictException(
# schema_name,
# "Found column in data item with same name as validity column"
# f' "{validity_column_name}".',
# )


def verify_normalized_table(
schema: Schema, table: TTableSchema, capabilities: DestinationCapabilitiesContext
) -> None:
Expand Down
1 change: 0 additions & 1 deletion dlt/pipeline/warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import warnings

from dlt.common.warnings import Dlt04DeprecationWarning
from dlt.common.destination import Destination, TDestinationReferenceArg


def full_refresh_argument_deprecated(caller_name: str, full_refresh: t.Optional[bool]) -> None:
Expand Down
40 changes: 40 additions & 0 deletions tests/load/athena_iceberg/test_athena_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,43 @@ def items_iceberg():
def test_force_iceberg_deprecation(destination_config: DestinationTestConfiguration) -> None:
"""Fails on deprecated force_iceberg option"""
destination_config.force_iceberg = True
pipeline = destination_config.setup_pipeline("test_force_iceberg_deprecation", dev_mode=True)

def items() -> Iterator[Any]:
yield {
"id": 1,
"name": "item",
"sub_items": [{"id": 101, "name": "sub item 101"}, {"id": 101, "name": "sub item 102"}],
}

@dlt.resource(name="items_normal", write_disposition="append")
def items_normal():
yield from items()

@dlt.resource(name="items_hive", write_disposition="append", table_format="hive")
def items_hive():
yield from items()

print(pipeline.run([items_normal, items_hive]))

# items_normal should load as iceberg
# _dlt_pipeline_state should load as iceberg (IMPORTANT for backward comp)

with pipeline.sql_client() as client:
client.execute_sql("SELECT * FROM items_normal")
client.execute_sql("SELECT * FROM items_hive")

with pytest.raises(DatabaseTerminalException) as dbex:
client.execute_sql("UPDATE items_hive SET name='new name'")
assert "Modifying Hive table rows is only supported for transactional tables" in str(dbex)

# modifying iceberg table will succeed
client.execute_sql("UPDATE items_normal SET name='new name'")
client.execute_sql("UPDATE items_normal__sub_items SET name='super new name'")
client.execute_sql("UPDATE _dlt_pipeline_state SET pipeline_name='new name'")

# trigger deprecation warning
from dlt.destinations import athena

athena_c = athena(force_iceberg=True).configuration(athena().spec()._bind_dataset_name("ds"))
assert athena_c.force_iceberg is True
9 changes: 7 additions & 2 deletions tests/load/pipeline/test_arrow_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,13 @@ def test_load_arrow_with_not_null_columns(
item_type: TestDataItemFormat, destination_config: DestinationTestConfiguration
) -> None:
"""Resource schema contains non-nullable columns. Arrow schema should be written accordingly"""
if destination_config.destination == "databricks" and destination_config.file_format == "jsonl":
pytest.skip("databricks / json cannot load most of the types so we skip this test")
if (
destination_config.destination in ("databricks", "redshift")
and destination_config.file_format == "jsonl"
):
pytest.skip(
"databricks + redshift / json cannot load most of the types so we skip this test"
)

item, records, _ = arrow_table_all_data_types(item_type, include_json=False, include_time=False)

Expand Down

0 comments on commit baa6f9b

Please sign in to comment.