Skip to content

Commit

Permalink
prepares for nested references (#1774)
Browse files Browse the repository at this point in the history
* adds methods to detect nested and root tables via parent hint

* skips linking in relational when no parent hint, removes linking skip for primary keys

* moves schema config and normalizer importers to schema module, braks cyclic deps with dest capabilities

* adds table_format override to pipeline run

* resolves merge strategy using adapter, uses default for a destination if strategy not explicit

* removes force_iceberg flag from athena, requires explicit table_format

* adds PreparedTableSchema to indicate TTableSchemas that are prepared for loading, makes verify_schema explicit method to be called by load, simplifies methods to prepare tables

* applies table and file format to run methods in all pipeline tests

* shortens temp table names in sql jobs

* adds filesystem to drop command tests

* fixes tests

* adds method to update table from diff into extract

* athena iceberg does not create dlt pipeline state as iceberg by default

* other test fixes

* deprecates force_icebergs, adds hive table format to opt out

* merges column props and hints, categorizes column props

* moves type mappers into destination capabilities

* fixes tests

* fixes cap data types verification errors not being raised

* adds missing deps

* fixes more tests

* allows precision and scale to be 0

* fixes more tests

* corrects connectorx for 3.12
  • Loading branch information
rudolfix authored Sep 9, 2024
1 parent 84f9fa7 commit dad2a08
Show file tree
Hide file tree
Showing 137 changed files with 3,946 additions and 2,794 deletions.
11 changes: 2 additions & 9 deletions .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ env:

# we need the secrets only for the rest_api_pipeline tests which are in tests/sources
# so we inject them only at the end
DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}
SOURCES__GITHUB__ACCESS_TOKEN: ${{ secrets.GITHUB_TOKEN }}

jobs:
get_docs_changes:
Expand Down Expand Up @@ -126,15 +126,8 @@ jobs:
name: Run pipeline tests with pyarrow but no pandas installed Windows
shell: cmd
- name: create secrets.toml for examples
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- name: Install pipeline and sources dependencies
run: poetry install --no-interaction -E duckdb -E cli -E parquet --with sentry-sdk --with pipeline -E deltalake -E sql_database

# TODO: this is needed for the filesystem tests, not sure if this should be in an extra?
- name: Install openpyxl for excel tests
run: poetry run pip install openpyxl
run: poetry install --no-interaction -E duckdb -E cli -E parquet -E deltalake -E sql_database --with sentry-sdk,pipeline,sources

- run: |
poetry run pytest tests/extract tests/pipeline tests/libs tests/cli/common tests/destinations tests/sources
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_athena.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ env:
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
ACTIVE_DESTINATIONS: "[\"athena\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-parquet-staging-iceberg\", \"athena-parquet-no-staging-iceberg\"]"
EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-parquet-iceberg-no-staging-iceberg\", \"athena-parquet-iceberg-staging-iceberg\"]"

jobs:
get_docs_changes:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/test_local_sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ jobs:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-sources

# TODO: which deps should we enable?
# TODO: which deps should we enable?
- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E sql_database --with sentry-sdk --with pipeline
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E sql_database --with sentry-sdk,pipeline,sources

# run sources tests in load against configured destinations
- run: poetry run pytest tests/load/sources
- run: poetry run pytest tests/load/sources
name: Run tests Linux
env:
DESTINATION__POSTGRES__CREDENTIALS: postgresql://loader:loader@localhost:5432/dlt_data
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ has-poetry:
poetry --version

dev: has-poetry
poetry install --all-extras --with airflow,docs,providers,pipeline,sentry-sdk,dbt
poetry install --all-extras --with docs,providers,pipeline,sources,sentry-sdk

lint:
./tools/check-package.sh
Expand Down
8 changes: 7 additions & 1 deletion dlt/common/configuration/container.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from contextlib import contextmanager, nullcontext, AbstractContextManager
import re
import threading
from typing import ClassVar, Dict, Iterator, Tuple, Type, TypeVar, Any
from typing import ClassVar, Dict, Iterator, Optional, Tuple, Type, TypeVar, Any

from dlt.common.configuration.specs.base_configuration import ContainerInjectableContext
from dlt.common.configuration.exceptions import (
Expand Down Expand Up @@ -171,6 +171,12 @@ def injectable_context(
# value was modified in the meantime and not restored
raise ContainerInjectableContextMangled(spec, context[spec], config)

def get(self, spec: Type[TConfiguration]) -> Optional[TConfiguration]:
try:
return self[spec]
except KeyError:
return None

@staticmethod
def thread_pool_prefix() -> str:
"""Creates a container friendly pool prefix that contains starting thread id. Container implementation will automatically use it
Expand Down
1 change: 1 addition & 0 deletions dlt/common/data_writers/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import NamedTuple, Sequence

from dlt.common.destination import TLoaderFileFormat
from dlt.common.exceptions import DltException

Expand Down
7 changes: 4 additions & 3 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
from dlt.common.destination import (
DestinationCapabilitiesContext,
TLoaderFileFormat,
ALL_SUPPORTED_FILE_FORMATS,
LOADER_FILE_FORMATS,
)
from dlt.common.metrics import DataWriterMetrics
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.schema.utils import is_nullable_column
from dlt.common.typing import StrAny, TDataItem


Expand Down Expand Up @@ -115,7 +116,7 @@ def item_format_from_file_extension(cls, extension: str) -> TDataItemFormat:
elif extension == "parquet":
return "arrow"
# those files may be imported by normalizer as is
elif extension in ALL_SUPPORTED_FILE_FORMATS:
elif extension in LOADER_FILE_FORMATS:
return "file"
else:
raise ValueError(f"Cannot figure out data item format for extension {extension}")
Expand Down Expand Up @@ -331,7 +332,7 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None:
self._caps,
self.timestamp_timezone,
),
nullable=schema_item.get("nullable", True),
nullable=is_nullable_column(schema_item),
)
for name, schema_item in columns_schema.items()
]
Expand Down
6 changes: 4 additions & 2 deletions dlt/common/destination/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
DestinationCapabilitiesContext,
merge_caps_file_formats,
TLoaderFileFormat,
ALL_SUPPORTED_FILE_FORMATS,
LOADER_FILE_FORMATS,
)
from dlt.common.destination.reference import TDestinationReferenceArg, Destination, TDestination
from dlt.common.destination.typing import PreparedTableSchema

__all__ = [
"DestinationCapabilitiesContext",
"merge_caps_file_formats",
"TLoaderFileFormat",
"ALL_SUPPORTED_FILE_FORMATS",
"LOADER_FILE_FORMATS",
"PreparedTableSchema",
"TDestinationReferenceArg",
"Destination",
"TDestination",
Expand Down
115 changes: 98 additions & 17 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from abc import ABC, abstractmethod
from typing import (
Any,
Callable,
ClassVar,
Iterable,
Literal,
Optional,
Sequence,
Tuple,
Set,
Protocol,
Type,
get_args,
)
from dlt.common.data_types import TDataType
from dlt.common.exceptions import TerminalValueError
from dlt.common.normalizers.typing import TNamingConventionReferenceArg
from dlt.common.typing import TLoaderFileFormat
from dlt.common.configuration.utils import serialize_value
Expand All @@ -20,36 +25,109 @@
DestinationLoadingViaStagingNotSupported,
DestinationLoadingWithoutStagingNotSupported,
)
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.destination.typing import PreparedTableSchema
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.schema.typing import (
TColumnSchema,
TColumnType,
TTableSchema,
TLoaderMergeStrategy,
TTableFormat,
)
from dlt.common.wei import EVM_DECIMAL_PRECISION

TLoaderParallelismStrategy = Literal["parallel", "table-sequential", "sequential"]
ALL_SUPPORTED_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat))
LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat))


class LoaderFileFormatAdapter(Protocol):
"""Callback protocol for `loader_file_format_adapter` capability."""
class LoaderFileFormatSelector(Protocol):
"""Selects preferred and supported file formats for a given table schema"""

@staticmethod
def __call__(
self,
preferred_loader_file_format: TLoaderFileFormat,
supported_loader_file_formats: Sequence[TLoaderFileFormat],
/,
*,
table_schema: "TTableSchema", # type: ignore[name-defined] # noqa: F821
table_schema: TTableSchema,
) -> Tuple[TLoaderFileFormat, Sequence[TLoaderFileFormat]]: ...


class MergeStrategySelector(Protocol):
"""Selects right set of merge strategies for a given table schema"""

@staticmethod
def __call__(
supported_merge_strategies: Sequence[TLoaderMergeStrategy],
/,
*,
table_schema: TTableSchema,
) -> Sequence["TLoaderMergeStrategy"]: ...


class DataTypeMapper(ABC):
def __init__(self, capabilities: "DestinationCapabilitiesContext") -> None:
"""Maps dlt data types into destination data types"""
self.capabilities = capabilities

@abstractmethod
def to_destination_type(self, column: TColumnSchema, table: PreparedTableSchema) -> str:
"""Gets destination data type for a particular `column` in prepared `table`"""
pass

@abstractmethod
def from_destination_type(
self, db_type: str, precision: Optional[int], scale: Optional[int]
) -> TColumnType:
"""Gets column type from db type"""
pass

@abstractmethod
def ensure_supported_type(
self,
column: TColumnSchema,
table: PreparedTableSchema,
loader_file_format: TLoaderFileFormat,
) -> None:
"""Makes sure that dlt type in `column` in prepared `table` is supported by the destination for a given file format"""
pass


class UnsupportedTypeMapper(DataTypeMapper):
"""Type Mapper that can't map any type"""

def to_destination_type(self, column: TColumnSchema, table: PreparedTableSchema) -> str:
raise NotImplementedError("No types are supported, use real type mapper")

def from_destination_type(
self, db_type: str, precision: Optional[int], scale: Optional[int]
) -> TColumnType:
raise NotImplementedError("No types are supported, use real type mapper")

def ensure_supported_type(
self,
column: TColumnSchema,
table: PreparedTableSchema,
loader_file_format: TLoaderFileFormat,
) -> None:
raise TerminalValueError(
"No types are supported, use real type mapper", column["data_type"]
)


@configspec
class DestinationCapabilitiesContext(ContainerInjectableContext):
"""Injectable destination capabilities required for many Pipeline stages ie. normalize"""

# do not allow to create default value, destination caps must be always explicitly inserted into container
can_create_default: ClassVar[bool] = False

preferred_loader_file_format: TLoaderFileFormat = None
supported_loader_file_formats: Sequence[TLoaderFileFormat] = None
loader_file_format_adapter: LoaderFileFormatAdapter = None
loader_file_format_selector: LoaderFileFormatSelector = None
"""Callable that adapts `preferred_loader_file_format` and `supported_loader_file_formats` at runtime."""
supported_table_formats: Sequence["TTableFormat"] = None # type: ignore[name-defined] # noqa: F821
supported_table_formats: Sequence[TTableFormat] = None
type_mapper: Optional[Type[DataTypeMapper]] = None
recommended_file_size: Optional[int] = None
"""Recommended file size in bytes when writing extract/load files"""
preferred_staging_file_format: Optional[TLoaderFileFormat] = None
Expand Down Expand Up @@ -89,14 +167,12 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
max_table_nesting: Optional[int] = None
"""Allows a destination to overwrite max_table_nesting from source"""

supported_merge_strategies: Sequence["TLoaderMergeStrategy"] = None # type: ignore[name-defined] # noqa: F821
supported_merge_strategies: Sequence[TLoaderMergeStrategy] = None
merge_strategies_selector: MergeStrategySelector = None
# TODO: also add `supported_replace_strategies` capability

# do not allow to create default value, destination caps must be always explicitly inserted into container
can_create_default: ClassVar[bool] = False

max_parallel_load_jobs: Optional[int] = None
"""The destination can set the maxium amount of parallel load jobs being executed"""
"""The destination can set the maximum amount of parallel load jobs being executed"""
loader_parallelism_strategy: Optional[TLoaderParallelismStrategy] = None
"""The destination can override the parallelism strategy"""

Expand All @@ -109,16 +185,17 @@ def generates_case_sensitive_identifiers(self) -> bool:
def generic_capabilities(
preferred_loader_file_format: TLoaderFileFormat = None,
naming_convention: TNamingConventionReferenceArg = None,
loader_file_format_adapter: LoaderFileFormatAdapter = None,
supported_table_formats: Sequence["TTableFormat"] = None, # type: ignore[name-defined] # noqa: F821
supported_merge_strategies: Sequence["TLoaderMergeStrategy"] = None, # type: ignore[name-defined] # noqa: F821
loader_file_format_selector: LoaderFileFormatSelector = None,
supported_table_formats: Sequence[TTableFormat] = None,
supported_merge_strategies: Sequence[TLoaderMergeStrategy] = None,
merge_strategies_selector: MergeStrategySelector = None,
) -> "DestinationCapabilitiesContext":
from dlt.common.data_writers.escape import format_datetime_literal

caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = preferred_loader_file_format
caps.supported_loader_file_formats = ["jsonl", "insert_values", "parquet", "csv"]
caps.loader_file_format_adapter = loader_file_format_adapter
caps.loader_file_format_selector = loader_file_format_selector
caps.preferred_staging_file_format = None
caps.supported_staging_file_formats = []
caps.naming_convention = naming_convention or caps.naming_convention
Expand All @@ -140,8 +217,12 @@ def generic_capabilities(
caps.supports_transactions = True
caps.supports_multiple_statements = True
caps.supported_merge_strategies = supported_merge_strategies or []
caps.merge_strategies_selector = merge_strategies_selector
return caps

def get_type_mapper(self) -> DataTypeMapper:
return self.type_mapper(self)


def merge_caps_file_formats(
destination: str,
Expand Down
33 changes: 32 additions & 1 deletion dlt/common/destination/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Iterable, List
from typing import Any, Iterable, List, Sequence

from dlt.common.exceptions import DltException, TerminalException, TransientException

Expand Down Expand Up @@ -102,6 +102,37 @@ def __init__(
)


class UnsupportedDataType(DestinationTerminalException):
def __init__(
self,
destination_type: str,
table_name: str,
column: str,
data_type: str,
file_format: str,
available_in_formats: Sequence[str],
more_info: str,
) -> None:
self.destination_type = destination_type
self.table_name = table_name
self.column = column
self.data_type = data_type
self.file_format = file_format
self.available_in_formats = available_in_formats
self.more_info = more_info
msg = (
f"Destination {destination_type} cannot load data type '{data_type}' from"
f" '{file_format}' files. The affected table is '{table_name}' column '{column}'."
)
if available_in_formats:
msg += f" Note: '{data_type}' can be loaded from {available_in_formats} formats(s)."
else:
msg += f" None of available file formats support '{data_type}' for this destination."
if more_info:
msg += " More info: " + more_info
super().__init__(msg)


class DestinationHasFailedJobs(DestinationTerminalException):
def __init__(self, destination_name: str, load_id: str, failed_jobs: List[Any]) -> None:
self.destination_name = destination_name
Expand Down
Loading

0 comments on commit dad2a08

Please sign in to comment.