Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Delta table support for filesystem destination #1382

Merged
merged 50 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
683b35c
add delta table support for filesystem destination
May 17, 2024
a650de7
Merge branch 'refs/heads/devel' into 978-filesystem-delta-table
May 18, 2024
d66cbb2
Merge branch 'devel' of https://github.com/dlt-hub/dlt into 978-files…
May 18, 2024
6e3dced
remove duplicate method definition
May 18, 2024
b241e8c
make property robust
May 18, 2024
10185df
exclude high-precision decimal columns
May 18, 2024
574215f
make delta imports conditional
May 18, 2024
ae03815
include pyarrow in deltalake dependency
May 18, 2024
88cbfcf
install extra deltalake dependency
May 18, 2024
b83ca8b
disable high precision decimal arrow test columns by default
May 19, 2024
b8d2967
include arrow max precision decimal column
May 19, 2024
7a38470
introduce directory job and refactor delta table code
jorritsandbrink May 26, 2024
418d8a8
refactor delta table load
jorritsandbrink May 29, 2024
fad4ff0
revert import changes
jorritsandbrink May 29, 2024
8134aab
Merge branch 'devel' of https://github.com/dlt-hub/dlt into 978-files…
jorritsandbrink May 30, 2024
91716df
add delta table format child table handling
jorritsandbrink May 30, 2024
8bdb93f
make table_format key lookups robust
jorritsandbrink May 30, 2024
0a32c44
write remote path to reference file
jorritsandbrink May 30, 2024
0fd7e3e
add supported table formats and file format adapter to destination ca…
jorritsandbrink May 31, 2024
e9282ea
remove jsonl and parquet from table formats
jorritsandbrink May 31, 2024
c87d68e
add object_store rust crate credentials handling
jorritsandbrink Jun 2, 2024
1e341cf
add deltalake_storage_options to filesystem config
jorritsandbrink Jun 2, 2024
2e04cff
move function to top level to prevent multiprocessing pickle error
jorritsandbrink Jun 3, 2024
0240c39
add new deltalake_storage_options filesystem config key to tests
jorritsandbrink Jun 3, 2024
f47de39
replace secrets with dummy values in test
jorritsandbrink Jun 3, 2024
1d9b968
reorganize object_store rust crate credentials tests
jorritsandbrink Jun 3, 2024
12e03ec
add delta table format docs
jorritsandbrink Jun 3, 2024
681ae48
move delta table logical delete logic to filesystem client
jorritsandbrink Jun 4, 2024
83745fa
rename pyarrow lib method names
jorritsandbrink Jun 4, 2024
72553d6
rename utils to delta_utils
jorritsandbrink Jun 4, 2024
3f41402
import pyarrow from dlt common libs
jorritsandbrink Jun 4, 2024
a49b23d
move delta lake utitilities to module in dlt common libs
jorritsandbrink Jun 4, 2024
5b9071f
import delta lake utils early to assert dependencies availability
jorritsandbrink Jun 4, 2024
6f76587
handle file format adaptation at table level
jorritsandbrink Jun 4, 2024
bae5266
Merge branch 'devel' of https://github.com/dlt-hub/dlt into 978-files…
jorritsandbrink Jun 4, 2024
70fddc3
initialize file format variables
jorritsandbrink Jun 4, 2024
1a10d2d
split delta table format tests
jorritsandbrink Jun 4, 2024
e1e4772
handle table schema is None case
jorritsandbrink Jun 5, 2024
d25ebc4
add test for dynamic dispatching of delta tables
jorritsandbrink Jun 5, 2024
4420a36
mark core delta table test as essential
jorritsandbrink Jun 5, 2024
0bde8b9
simplify item normalizer dict key
jorritsandbrink Jun 5, 2024
86ab9ff
make list copy to prevent in place mutations
jorritsandbrink Jun 5, 2024
9a302dc
add extra deltalake dependency
jorritsandbrink Jun 5, 2024
6c3c8c7
only test deltalake lib on local filesystem
jorritsandbrink Jun 5, 2024
e17a54b
Merge branch 'devel' into 978-filesystem-delta-table
rudolfix Jun 5, 2024
bcfb418
properly evaluates lazy annotations
rudolfix Jun 5, 2024
8f7831f
uses base FilesystemConfiguration from common in libs
rudolfix Jun 5, 2024
e33af63
solves union type reordering due to caching and clash with delta-rs D…
rudolfix Jun 5, 2024
cdeefd2
creates a table with just root name to cache item normalizers properly
rudolfix Jun 5, 2024
a75a0f9
Merge branch 'devel' into 978-filesystem-delta-table
rudolfix Jun 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli --with sentry-sdk --with pipeline
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli --with sentry-sdk --with pipeline -E deltalake

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate --with sentry-sdk --with pipeline
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate --with sentry-sdk --with pipeline -E deltalake

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
8 changes: 8 additions & 0 deletions dlt/common/configuration/specs/aws_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ def to_session_credentials(self) -> Dict[str, str]:
aws_session_token=self.aws_session_token,
)

def to_object_store_rs_credentials(self) -> Dict[str, str]:
# https://docs.rs/object_store/latest/object_store/aws
assert self.region_name is not None, "`object_store` Rust crate requires AWS region."
creds = self.to_session_credentials()
if creds["aws_session_token"] is None:
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
creds.pop("aws_session_token")
return {**creds, **{"region": self.region_name}}


@configspec
class AwsCredentials(AwsCredentialsWithoutDefaults, CredentialsWithDefault):
Expand Down
11 changes: 11 additions & 0 deletions dlt/common/configuration/specs/azure_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ def to_adlfs_credentials(self) -> Dict[str, Any]:
sas_token=self.azure_storage_sas_token,
)

def to_object_store_rs_credentials(self) -> Dict[str, str]:
# https://docs.rs/object_store/latest/object_store/azure
creds = self.to_adlfs_credentials()
if creds["sas_token"] is None:
creds.pop("sas_token")
return creds

def create_sas_token(self) -> None:
from azure.storage.blob import generate_account_sas, ResourceTypes

Expand Down Expand Up @@ -61,6 +68,10 @@ def to_adlfs_credentials(self) -> Dict[str, Any]:
client_secret=self.azure_client_secret,
)

def to_object_store_rs_credentials(self) -> Dict[str, str]:
# https://docs.rs/object_store/latest/object_store/azure
return self.to_adlfs_credentials()


@configspec
class AzureCredentials(AzureCredentialsWithoutDefaults, CredentialsWithDefault):
Expand Down
13 changes: 12 additions & 1 deletion dlt/common/configuration/specs/gcp_credentials.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses
import sys
from typing import Any, ClassVar, Final, List, Tuple, Union, Dict
from typing import Any, ClassVar, Final, List, Tuple, Union, Dict, Optional

from dlt.common.json import json
from dlt.common.pendulum import pendulum
Expand Down Expand Up @@ -74,6 +74,7 @@ def to_gcs_credentials(self) -> Dict[str, Any]:
@configspec
class GcpServiceAccountCredentialsWithoutDefaults(GcpCredentials):
private_key: TSecretValue = None
private_key_id: Optional[str] = None
client_email: str = None
type: Final[str] = dataclasses.field( # noqa: A003
default="service_account", init=False, repr=False, compare=False
Expand Down Expand Up @@ -122,6 +123,10 @@ def to_native_credentials(self) -> Any:
else:
return ServiceAccountCredentials.from_service_account_info(self)

def to_object_store_rs_credentials(self) -> Dict[str, str]:
# https://docs.rs/object_store/latest/object_store/gcp
return {"service_account_key": json.dumps(dict(self))}

def __str__(self) -> str:
return f"{self.client_email}@{self.project_id}"

Expand Down Expand Up @@ -171,6 +176,12 @@ def parse_native_representation(self, native_value: Any) -> None:
def to_native_representation(self) -> str:
return json.dumps(self._info_dict())

def to_object_store_rs_credentials(self) -> Dict[str, str]:
raise NotImplementedError(
"`object_store` Rust crate does not support OAuth for GCP credentials. Reference:"
" https://docs.rs/object_store/latest/object_store/gcp."
)

def auth(self, scopes: Union[str, List[str]] = None, redirect_url: str = None) -> None:
if not self.refresh_token:
self.add_scopes(scopes)
Expand Down
34 changes: 32 additions & 2 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
from typing import Any, Callable, ClassVar, List, Literal, Optional, Sequence, Tuple, Set, get_args
from typing import (
Any,
Callable,
ClassVar,
Literal,
Optional,
Sequence,
Tuple,
Set,
Protocol,
get_args,
)

from dlt.common.configuration.utils import serialize_value
from dlt.common.configuration import configspec
Expand All @@ -9,7 +20,6 @@
DestinationLoadingWithoutStagingNotSupported,
)
from dlt.common.utils import identity
from dlt.common.pendulum import pendulum

from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.wei import EVM_DECIMAL_PRECISION
Expand All @@ -23,12 +33,28 @@
ALL_SUPPORTED_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat))


class LoaderFileFormatAdapter(Protocol):
"""Callback protocol for `loader_file_format_adapter` capability."""

def __call__(
self,
preferred_loader_file_format: TLoaderFileFormat,
supported_loader_file_formats: Sequence[TLoaderFileFormat],
/,
*,
table_schema: "TTableSchema", # type: ignore[name-defined] # noqa: F821
) -> Tuple[TLoaderFileFormat, Sequence[TLoaderFileFormat]]: ...


@configspec
class DestinationCapabilitiesContext(ContainerInjectableContext):
"""Injectable destination capabilities required for many Pipeline stages ie. normalize"""

preferred_loader_file_format: TLoaderFileFormat = None
supported_loader_file_formats: Sequence[TLoaderFileFormat] = None
loader_file_format_adapter: LoaderFileFormatAdapter = None
"""Callable that adapts `preferred_loader_file_format` and `supported_loader_file_formats` at runtime."""
supported_table_formats: Sequence["TTableFormat"] = None # type: ignore[name-defined] # noqa: F821
recommended_file_size: Optional[int] = None
"""Recommended file size in bytes when writing extract/load files"""
preferred_staging_file_format: Optional[TLoaderFileFormat] = None
Expand Down Expand Up @@ -65,14 +91,18 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
@staticmethod
def generic_capabilities(
preferred_loader_file_format: TLoaderFileFormat = None,
loader_file_format_adapter: LoaderFileFormatAdapter = None,
supported_table_formats: Sequence["TTableFormat"] = None, # type: ignore[name-defined] # noqa: F821
) -> "DestinationCapabilitiesContext":
from dlt.common.data_writers.escape import format_datetime_literal

caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = preferred_loader_file_format
caps.supported_loader_file_formats = ["jsonl", "insert_values", "parquet", "csv"]
caps.loader_file_format_adapter = loader_file_format_adapter
caps.preferred_staging_file_format = None
caps.supported_staging_file_formats = []
caps.supported_table_formats = supported_table_formats or []
caps.escape_identifier = identity
caps.escape_literal = serialize_value
caps.format_datetime_literal = format_datetime_literal
Expand Down
5 changes: 4 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.storages.load_package import LoadJobInfo

TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"]
TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration")
Expand Down Expand Up @@ -313,7 +314,9 @@ def should_truncate_table_before_load(self, table: TTableSchema) -> bool:
return table["write_disposition"] == "replace"

def create_table_chain_completed_followup_jobs(
self, table_chain: Sequence[TTableSchema]
self,
table_chain: Sequence[TTableSchema],
table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None,
) -> List[NewLoadJob]:
"""Creates a list of followup jobs that should be executed after a table chain is completed"""
return []
Expand Down
90 changes: 90 additions & 0 deletions dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from typing import Optional, Dict, Union

from dlt import version
from dlt.common import logger
from dlt.common.libs.pyarrow import pyarrow as pa
from dlt.common.libs.pyarrow import dataset_to_table, cast_arrow_schema_types
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.exceptions import MissingDependencyException
from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration

try:
from deltalake import write_deltalake
except ModuleNotFoundError:
raise MissingDependencyException(
"dlt deltalake helpers",
[f"{version.DLT_PKG_NAME}[deltalake]"],
"Install `deltalake` so dlt can create Delta tables in the `filesystem` destination.",
)


def ensure_delta_compatible_arrow_table(table: pa.table) -> pa.Table:
"""Returns Arrow table compatible with Delta table format.

Casts table schema to replace data types not supported by Delta.
"""
ARROW_TO_DELTA_COMPATIBLE_ARROW_TYPE_MAP = {
# maps type check function to type factory function
pa.types.is_null: pa.string(),
pa.types.is_time: pa.string(),
pa.types.is_decimal256: pa.string(), # pyarrow does not allow downcasting to decimal128
}
adjusted_schema = cast_arrow_schema_types(
table.schema, ARROW_TO_DELTA_COMPATIBLE_ARROW_TYPE_MAP
)
return table.cast(adjusted_schema)


def get_delta_write_mode(write_disposition: TWriteDisposition) -> str:
"""Translates dlt write disposition to Delta write mode."""
if write_disposition in ("append", "merge"): # `merge` disposition resolves to `append`
return "append"
elif write_disposition == "replace":
return "overwrite"
else:
raise ValueError(
"`write_disposition` must be `append`, `replace`, or `merge`,"
f" but `{write_disposition}` was provided."
)


def write_delta_table(
path: str,
data: Union[pa.Table, pa.dataset.Dataset],
write_disposition: TWriteDisposition,
storage_options: Optional[Dict[str, str]] = None,
) -> None:
"""Writes in-memory Arrow table to on-disk Delta table."""

table = dataset_to_table(data)

# throws warning for `s3` protocol: https://github.com/delta-io/delta-rs/issues/2460
# TODO: upgrade `deltalake` lib after https://github.com/delta-io/delta-rs/pull/2500
# is released
write_deltalake( # type: ignore[call-overload]
table_or_uri=path,
data=ensure_delta_compatible_arrow_table(table),
mode=get_delta_write_mode(write_disposition),
schema_mode="merge", # enable schema evolution (adding new columns)
storage_options=storage_options,
engine="rust", # `merge` schema mode requires `rust` engine
)


def _deltalake_storage_options(config: FilesystemDestinationClientConfiguration) -> Dict[str, str]:
"""Returns dict that can be passed as `storage_options` in `deltalake` library."""
creds = {}
extra_options = {}
if config.protocol in ("az", "gs", "s3"):
creds = config.credentials.to_object_store_rs_credentials()
if config.deltalake_storage_options is not None:
extra_options = config.deltalake_storage_options
shared_keys = creds.keys() & extra_options.keys()
if len(shared_keys) > 0:
logger.warning(
"The `deltalake_storage_options` configuration dictionary contains "
"keys also provided by dlt's credential system: "
+ ", ".join([f"`{key}`" for key in shared_keys])
+ ". dlt will use the values in `deltalake_storage_options`."
)
return {**creds, **extra_options}
26 changes: 26 additions & 0 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import pyarrow
import pyarrow.parquet
import pyarrow.compute
import pyarrow.dataset
except ModuleNotFoundError:
raise MissingDependencyException(
"dlt pyarrow helpers",
Expand All @@ -37,6 +38,8 @@

TAnyArrowItem = Union[pyarrow.Table, pyarrow.RecordBatch]

ARROW_DECIMAL_MAX_PRECISION = 76


def get_py_arrow_datatype(
column: TColumnType,
Expand Down Expand Up @@ -411,6 +414,29 @@ def pq_stream_with_new_columns(
yield tbl


def dataset_to_table(data: Union[pyarrow.Table, pyarrow.dataset.Dataset]) -> pyarrow.Table:
return data.to_table() if isinstance(data, pyarrow.dataset.Dataset) else data


def cast_arrow_schema_types(
schema: pyarrow.Schema,
type_map: Dict[Callable[[pyarrow.DataType], bool], Callable[..., pyarrow.DataType]],
) -> pyarrow.Schema:
"""Returns type-casted Arrow schema.

Replaces data types for fields matching a type check in `type_map`.
Type check functions in `type_map` are assumed to be mutually exclusive, i.e.
a data type does not match more than one type check function.
"""
for i, e in enumerate(schema.types):
for type_check, cast_type in type_map.items():
if type_check(e):
adjusted_field = schema.field(i).with_type(cast_type)
schema = schema.set(i, adjusted_field)
break # if type matches type check, do not do other type checks
return schema


class NameNormalizationClash(ValueError):
def __init__(self, reason: str) -> None:
msg = f"Arrow column name clash after input data normalization. {reason}"
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
"dedup_sort",
]
"""Known hints of a column used to declare hint regexes."""
TTableFormat = Literal["iceberg", "parquet", "jsonl"]
TTableFormat = Literal["iceberg", "delta"]
TTypeDetections = Literal[
"timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double"
]
Expand Down
1 change: 1 addition & 0 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class FilesystemConfiguration(BaseConfiguration):
"""Indicates read only filesystem access. Will enable caching"""
kwargs: Optional[DictStrAny] = None
client_kwargs: Optional[DictStrAny] = None
deltalake_storage_options: Optional[DictStrAny] = None

@property
def protocol(self) -> str:
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/storages/load_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def create_load_id() -> str:


class ParsedLoadJobFileName(NamedTuple):
"""Represents a file name of a job in load package. The file name contains name of a table, number of times the job was retired, extension
"""Represents a file name of a job in load package. The file name contains name of a table, number of times the job was retried, extension
and a 5 bytes random string to make job file name unique.
The job id does not contain retry count and is immutable during loading of the data
"""
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/athena/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def capabilities() -> DestinationCapabilitiesContext:
# athena only supports loading from staged files on s3 for now
caps.preferred_loader_file_format = None
caps.supported_loader_file_formats = []
caps.supported_table_formats = ["iceberg"]
caps.preferred_staging_file_format = "parquet"
caps.supported_staging_file_formats = ["parquet", "jsonl"]
caps.escape_identifier = escape_athena_identifier
Expand Down
10 changes: 5 additions & 5 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,11 @@ def _get_table_update_sql(
{partition_clause}
LOCATION '{location.rstrip('/')}'
TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');""")
elif table_format == "jsonl":
sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name}
({columns})
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION '{location}';""")
# elif table_format == "jsonl":
# sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name}
# ({columns})
# ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
# LOCATION '{location}';""")
else:
sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name}
({columns})
Expand Down
5 changes: 4 additions & 1 deletion dlt/destinations/impl/dummy/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dlt.common.pendulum import pendulum
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.storages import FileStorage
from dlt.common.storages.load_package import LoadJobInfo
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.exceptions import (
DestinationTerminalException,
Expand Down Expand Up @@ -157,7 +158,9 @@ def restore_file_load(self, file_path: str) -> LoadJob:
return JOBS[job_id]

def create_table_chain_completed_followup_jobs(
self, table_chain: Sequence[TTableSchema]
self,
table_chain: Sequence[TTableSchema],
table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None,
) -> List[NewLoadJob]:
"""Creates a list of followup jobs that should be executed after a table chain is completed"""
return []
Expand Down
Loading
Loading