Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prepares for nested references #1774

Merged
merged 26 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f32a64e
adds methods to detect nested and root tables via parent hint
rudolfix Aug 31, 2024
6eb8d92
skips linking in relational when no parent hint, removes linking skip…
rudolfix Aug 31, 2024
4427c27
moves schema config and normalizer importers to schema module, braks …
rudolfix Aug 31, 2024
b729905
adds table_format override to pipeline run
rudolfix Aug 31, 2024
034c769
resolves merge strategy using adapter, uses default for a destination…
rudolfix Aug 31, 2024
f176ede
removes force_iceberg flag from athena, requires explicit table_format
rudolfix Aug 31, 2024
e9b0879
adds PreparedTableSchema to indicate TTableSchemas that are prepared …
rudolfix Aug 31, 2024
12a37e0
applies table and file format to run methods in all pipeline tests
rudolfix Aug 31, 2024
4a974b4
shortens temp table names in sql jobs
rudolfix Sep 1, 2024
4cd3dfa
adds filesystem to drop command tests
rudolfix Sep 1, 2024
07b7ad9
fixes tests
rudolfix Sep 1, 2024
1cf1c98
adds method to update table from diff into extract
rudolfix Sep 1, 2024
310f1e2
athena iceberg does not create dlt pipeline state as iceberg by default
rudolfix Sep 1, 2024
8fb171a
other test fixes
rudolfix Sep 1, 2024
baa6f9b
deprecates force_icebergs, adds hive table format to opt out
rudolfix Sep 3, 2024
c2d35bf
merges column props and hints, categorizes column props
rudolfix Sep 8, 2024
1a8111d
moves type mappers into destination capabilities
rudolfix Sep 8, 2024
8e25bb3
Merge branch 'devel' into feat/prepares-for-nested-references
rudolfix Sep 8, 2024
6a19228
fixes tests
rudolfix Sep 8, 2024
21ed11c
fixes cap data types verification errors not being raised
rudolfix Sep 8, 2024
fc89ff1
adds missing deps
rudolfix Sep 8, 2024
7e7708c
fixes more tests
rudolfix Sep 8, 2024
200e480
allows precision and scale to be 0
rudolfix Sep 8, 2024
16d41c6
Merge branch 'devel' into feat/prepares-for-nested-references
rudolfix Sep 8, 2024
0508b11
fixes more tests
rudolfix Sep 8, 2024
f3654b9
corrects connectorx for 3.12
rudolfix Sep 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ env:

# we need the secrets only for the rest_api_pipeline tests which are in tests/sources
# so we inject them only at the end
DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}
SOURCES__GITHUB__ACCESS_TOKEN: ${{ secrets.GITHUB_TOKEN }}

jobs:
get_docs_changes:
Expand Down Expand Up @@ -126,15 +126,8 @@ jobs:
name: Run pipeline tests with pyarrow but no pandas installed Windows
shell: cmd

- name: create secrets.toml for examples
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- name: Install pipeline and sources dependencies
run: poetry install --no-interaction -E duckdb -E cli -E parquet --with sentry-sdk --with pipeline -E deltalake -E sql_database

# TODO: this is needed for the filesystem tests, not sure if this should be in an extra?
- name: Install openpyxl for excel tests
run: poetry run pip install openpyxl
run: poetry install --no-interaction -E duckdb -E cli -E parquet -E deltalake -E sql_database --with sentry-sdk,pipeline,sources

- run: |
poetry run pytest tests/extract tests/pipeline tests/libs tests/cli/common tests/destinations tests/sources
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_athena.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ env:
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
ACTIVE_DESTINATIONS: "[\"athena\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-parquet-staging-iceberg\", \"athena-parquet-no-staging-iceberg\"]"
EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-parquet-iceberg-no-staging-iceberg\", \"athena-parquet-iceberg-staging-iceberg\"]"

jobs:
get_docs_changes:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/test_local_sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ jobs:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-sources

# TODO: which deps should we enable?
# TODO: which deps should we enable?
- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E sql_database --with sentry-sdk --with pipeline
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E sql_database --with sentry-sdk,pipeline,sources

# run sources tests in load against configured destinations
- run: poetry run pytest tests/load/sources
- run: poetry run pytest tests/load/sources
name: Run tests Linux
env:
DESTINATION__POSTGRES__CREDENTIALS: postgresql://loader:loader@localhost:5432/dlt_data
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ has-poetry:
poetry --version

dev: has-poetry
poetry install --all-extras --with airflow,docs,providers,pipeline,sentry-sdk,dbt
poetry install --all-extras --with docs,providers,pipeline,sources,sentry-sdk

lint:
./tools/check-package.sh
Expand Down
8 changes: 7 additions & 1 deletion dlt/common/configuration/container.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from contextlib import contextmanager, nullcontext, AbstractContextManager
import re
import threading
from typing import ClassVar, Dict, Iterator, Tuple, Type, TypeVar, Any
from typing import ClassVar, Dict, Iterator, Optional, Tuple, Type, TypeVar, Any

from dlt.common.configuration.specs.base_configuration import ContainerInjectableContext
from dlt.common.configuration.exceptions import (
Expand Down Expand Up @@ -171,6 +171,12 @@ def injectable_context(
# value was modified in the meantime and not restored
raise ContainerInjectableContextMangled(spec, context[spec], config)

def get(self, spec: Type[TConfiguration]) -> Optional[TConfiguration]:
try:
return self[spec]
except KeyError:
return None

@staticmethod
def thread_pool_prefix() -> str:
"""Creates a container friendly pool prefix that contains starting thread id. Container implementation will automatically use it
Expand Down
1 change: 1 addition & 0 deletions dlt/common/data_writers/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import NamedTuple, Sequence

from dlt.common.destination import TLoaderFileFormat
from dlt.common.exceptions import DltException

Expand Down
7 changes: 4 additions & 3 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
from dlt.common.destination import (
DestinationCapabilitiesContext,
TLoaderFileFormat,
ALL_SUPPORTED_FILE_FORMATS,
LOADER_FILE_FORMATS,
)
from dlt.common.metrics import DataWriterMetrics
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.schema.utils import is_nullable_column
from dlt.common.typing import StrAny, TDataItem


Expand Down Expand Up @@ -115,7 +116,7 @@ def item_format_from_file_extension(cls, extension: str) -> TDataItemFormat:
elif extension == "parquet":
return "arrow"
# those files may be imported by normalizer as is
elif extension in ALL_SUPPORTED_FILE_FORMATS:
elif extension in LOADER_FILE_FORMATS:
return "file"
else:
raise ValueError(f"Cannot figure out data item format for extension {extension}")
Expand Down Expand Up @@ -331,7 +332,7 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None:
self._caps,
self.timestamp_timezone,
),
nullable=schema_item.get("nullable", True),
nullable=is_nullable_column(schema_item),
)
for name, schema_item in columns_schema.items()
]
Expand Down
6 changes: 4 additions & 2 deletions dlt/common/destination/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
DestinationCapabilitiesContext,
merge_caps_file_formats,
TLoaderFileFormat,
ALL_SUPPORTED_FILE_FORMATS,
LOADER_FILE_FORMATS,
)
from dlt.common.destination.reference import TDestinationReferenceArg, Destination, TDestination
from dlt.common.destination.typing import PreparedTableSchema

__all__ = [
"DestinationCapabilitiesContext",
"merge_caps_file_formats",
"TLoaderFileFormat",
"ALL_SUPPORTED_FILE_FORMATS",
"LOADER_FILE_FORMATS",
"PreparedTableSchema",
"TDestinationReferenceArg",
"Destination",
"TDestination",
Expand Down
115 changes: 98 additions & 17 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from abc import ABC, abstractmethod
from typing import (
Any,
Callable,
ClassVar,
Iterable,
Literal,
Optional,
Sequence,
Tuple,
Set,
Protocol,
Type,
get_args,
)
from dlt.common.data_types import TDataType
from dlt.common.exceptions import TerminalValueError
from dlt.common.normalizers.typing import TNamingConventionReferenceArg
from dlt.common.typing import TLoaderFileFormat
from dlt.common.configuration.utils import serialize_value
Expand All @@ -20,36 +25,109 @@
DestinationLoadingViaStagingNotSupported,
DestinationLoadingWithoutStagingNotSupported,
)
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.destination.typing import PreparedTableSchema
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.schema.typing import (
TColumnSchema,
TColumnType,
TTableSchema,
TLoaderMergeStrategy,
TTableFormat,
)
from dlt.common.wei import EVM_DECIMAL_PRECISION

TLoaderParallelismStrategy = Literal["parallel", "table-sequential", "sequential"]
ALL_SUPPORTED_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat))
LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat))


class LoaderFileFormatAdapter(Protocol):
"""Callback protocol for `loader_file_format_adapter` capability."""
class LoaderFileFormatSelector(Protocol):
"""Selects preferred and supported file formats for a given table schema"""

@staticmethod
def __call__(
self,
preferred_loader_file_format: TLoaderFileFormat,
supported_loader_file_formats: Sequence[TLoaderFileFormat],
/,
*,
table_schema: "TTableSchema", # type: ignore[name-defined] # noqa: F821
table_schema: TTableSchema,
) -> Tuple[TLoaderFileFormat, Sequence[TLoaderFileFormat]]: ...


class MergeStrategySelector(Protocol):
"""Selects right set of merge strategies for a given table schema"""

@staticmethod
def __call__(
supported_merge_strategies: Sequence[TLoaderMergeStrategy],
/,
*,
table_schema: TTableSchema,
) -> Sequence["TLoaderMergeStrategy"]: ...


class DataTypeMapper(ABC):
def __init__(self, capabilities: "DestinationCapabilitiesContext") -> None:
"""Maps dlt data types into destination data types"""
self.capabilities = capabilities

@abstractmethod
def to_destination_type(self, column: TColumnSchema, table: PreparedTableSchema) -> str:
"""Gets destination data type for a particular `column` in prepared `table`"""
pass

@abstractmethod
def from_destination_type(
self, db_type: str, precision: Optional[int], scale: Optional[int]
) -> TColumnType:
"""Gets column type from db type"""
pass

@abstractmethod
def ensure_supported_type(
self,
column: TColumnSchema,
table: PreparedTableSchema,
loader_file_format: TLoaderFileFormat,
) -> None:
"""Makes sure that dlt type in `column` in prepared `table` is supported by the destination for a given file format"""
pass


class UnsupportedTypeMapper(DataTypeMapper):
"""Type Mapper that can't map any type"""

def to_destination_type(self, column: TColumnSchema, table: PreparedTableSchema) -> str:
raise NotImplementedError("No types are supported, use real type mapper")

def from_destination_type(
self, db_type: str, precision: Optional[int], scale: Optional[int]
) -> TColumnType:
raise NotImplementedError("No types are supported, use real type mapper")

def ensure_supported_type(
self,
column: TColumnSchema,
table: PreparedTableSchema,
loader_file_format: TLoaderFileFormat,
) -> None:
raise TerminalValueError(
"No types are supported, use real type mapper", column["data_type"]
)


@configspec
class DestinationCapabilitiesContext(ContainerInjectableContext):
"""Injectable destination capabilities required for many Pipeline stages ie. normalize"""

# do not allow to create default value, destination caps must be always explicitly inserted into container
can_create_default: ClassVar[bool] = False

preferred_loader_file_format: TLoaderFileFormat = None
supported_loader_file_formats: Sequence[TLoaderFileFormat] = None
loader_file_format_adapter: LoaderFileFormatAdapter = None
loader_file_format_selector: LoaderFileFormatSelector = None
"""Callable that adapts `preferred_loader_file_format` and `supported_loader_file_formats` at runtime."""
supported_table_formats: Sequence["TTableFormat"] = None # type: ignore[name-defined] # noqa: F821
supported_table_formats: Sequence[TTableFormat] = None
type_mapper: Optional[Type[DataTypeMapper]] = None
recommended_file_size: Optional[int] = None
"""Recommended file size in bytes when writing extract/load files"""
preferred_staging_file_format: Optional[TLoaderFileFormat] = None
Expand Down Expand Up @@ -89,14 +167,12 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
max_table_nesting: Optional[int] = None
"""Allows a destination to overwrite max_table_nesting from source"""

supported_merge_strategies: Sequence["TLoaderMergeStrategy"] = None # type: ignore[name-defined] # noqa: F821
supported_merge_strategies: Sequence[TLoaderMergeStrategy] = None
merge_strategies_selector: MergeStrategySelector = None
# TODO: also add `supported_replace_strategies` capability

# do not allow to create default value, destination caps must be always explicitly inserted into container
can_create_default: ClassVar[bool] = False

max_parallel_load_jobs: Optional[int] = None
"""The destination can set the maxium amount of parallel load jobs being executed"""
"""The destination can set the maximum amount of parallel load jobs being executed"""
loader_parallelism_strategy: Optional[TLoaderParallelismStrategy] = None
"""The destination can override the parallelism strategy"""

Expand All @@ -109,16 +185,17 @@ def generates_case_sensitive_identifiers(self) -> bool:
def generic_capabilities(
preferred_loader_file_format: TLoaderFileFormat = None,
naming_convention: TNamingConventionReferenceArg = None,
loader_file_format_adapter: LoaderFileFormatAdapter = None,
supported_table_formats: Sequence["TTableFormat"] = None, # type: ignore[name-defined] # noqa: F821
supported_merge_strategies: Sequence["TLoaderMergeStrategy"] = None, # type: ignore[name-defined] # noqa: F821
loader_file_format_selector: LoaderFileFormatSelector = None,
supported_table_formats: Sequence[TTableFormat] = None,
supported_merge_strategies: Sequence[TLoaderMergeStrategy] = None,
merge_strategies_selector: MergeStrategySelector = None,
) -> "DestinationCapabilitiesContext":
from dlt.common.data_writers.escape import format_datetime_literal

caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = preferred_loader_file_format
caps.supported_loader_file_formats = ["jsonl", "insert_values", "parquet", "csv"]
caps.loader_file_format_adapter = loader_file_format_adapter
caps.loader_file_format_selector = loader_file_format_selector
caps.preferred_staging_file_format = None
caps.supported_staging_file_formats = []
caps.naming_convention = naming_convention or caps.naming_convention
Expand All @@ -140,8 +217,12 @@ def generic_capabilities(
caps.supports_transactions = True
caps.supports_multiple_statements = True
caps.supported_merge_strategies = supported_merge_strategies or []
caps.merge_strategies_selector = merge_strategies_selector
return caps

def get_type_mapper(self) -> DataTypeMapper:
return self.type_mapper(self)


def merge_caps_file_formats(
destination: str,
Expand Down
33 changes: 32 additions & 1 deletion dlt/common/destination/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Iterable, List
from typing import Any, Iterable, List, Sequence

from dlt.common.exceptions import DltException, TerminalException, TransientException

Expand Down Expand Up @@ -102,6 +102,37 @@ def __init__(
)


class UnsupportedDataType(DestinationTerminalException):
def __init__(
self,
destination_type: str,
table_name: str,
column: str,
data_type: str,
file_format: str,
available_in_formats: Sequence[str],
more_info: str,
) -> None:
self.destination_type = destination_type
self.table_name = table_name
self.column = column
self.data_type = data_type
self.file_format = file_format
self.available_in_formats = available_in_formats
self.more_info = more_info
msg = (
f"Destination {destination_type} cannot load data type '{data_type}' from"
f" '{file_format}' files. The affected table is '{table_name}' column '{column}'."
)
if available_in_formats:
msg += f" Note: '{data_type}' can be loaded from {available_in_formats} formats(s)."
else:
msg += f" None of available file formats support '{data_type}' for this destination."
if more_info:
msg += " More info: " + more_info
super().__init__(msg)


class DestinationHasFailedJobs(DestinationTerminalException):
def __init__(self, destination_name: str, load_id: str, failed_jobs: List[Any]) -> None:
self.destination_name = destination_name
Expand Down
Loading
Loading