Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iceberg table format support for filesystem destination #2067

Merged
merged 79 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
79c018c
add pyiceberg dependency and upgrade mypy
jorritsandbrink Nov 14, 2024
5014f88
extend pyiceberg dependencies
jorritsandbrink Nov 15, 2024
c632dd7
remove redundant delta annotation
jorritsandbrink Nov 15, 2024
a3f6587
add basic local filesystem iceberg support
jorritsandbrink Nov 15, 2024
87553a6
add active table format setting
jorritsandbrink Nov 15, 2024
513662e
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Nov 15, 2024
10121be
disable merge tests for iceberg table format
jorritsandbrink Nov 15, 2024
23c4db3
restore non-redundant extra info
jorritsandbrink Nov 15, 2024
195ee4c
refactor to in-memory iceberg catalog
jorritsandbrink Nov 15, 2024
ee6e22e
add s3 support for iceberg table format
jorritsandbrink Nov 15, 2024
bc51008
add schema evolution support for iceberg table format
jorritsandbrink Nov 16, 2024
2de58a2
extract _register_table function
jorritsandbrink Nov 16, 2024
dd4ad0f
add partition support for iceberg table format
jorritsandbrink Nov 20, 2024
04be59b
update docstring
jorritsandbrink Nov 20, 2024
42f59c7
enable child table test for iceberg table format
jorritsandbrink Nov 21, 2024
a540135
enable empty source test for iceberg table format
jorritsandbrink Nov 21, 2024
3d1dc63
make iceberg catalog namespace configurable and default to dataset name
jorritsandbrink Nov 22, 2024
59e6d08
add optional typing
jorritsandbrink Nov 22, 2024
71e436d
fix typo
jorritsandbrink Nov 24, 2024
2effa8f
improve typing
jorritsandbrink Nov 24, 2024
8979ee1
extract logic into dedicated function
jorritsandbrink Nov 24, 2024
e956b09
add iceberg read support to filesystem sql client
jorritsandbrink Nov 24, 2024
571bf0c
remove unused import
jorritsandbrink Nov 24, 2024
0ec5fcb
add todo
jorritsandbrink Nov 24, 2024
ab0b9a0
extract logic into separate functions
jorritsandbrink Nov 24, 2024
e149ba6
add azure support for iceberg table format
jorritsandbrink Nov 24, 2024
27b8659
generalize delta table format tests
jorritsandbrink Nov 24, 2024
d39d58d
enable get tables function test for iceberg table format
jorritsandbrink Nov 24, 2024
2f910c2
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Nov 24, 2024
547a37a
remove ignores
jorritsandbrink Nov 25, 2024
53b2d56
undo table directory management change
jorritsandbrink Nov 25, 2024
3ff9fb7
enable test_read_interfaces tests for iceberg
jorritsandbrink Nov 25, 2024
5b0cd17
fix active table format filter
jorritsandbrink Nov 25, 2024
8798d79
use mixin for object store rs credentials
jorritsandbrink Nov 25, 2024
c1cc068
generalize catalog typing
jorritsandbrink Nov 25, 2024
35f590b
extract pyiceberg scheme mapping into separate function
jorritsandbrink Nov 26, 2024
e0d6a1b
generalize credentials mixin test setup
jorritsandbrink Nov 26, 2024
85a10a2
remove unused import
jorritsandbrink Nov 26, 2024
54cd0bc
add centralized fallback to append when merge is not supported
jorritsandbrink Nov 26, 2024
4e979f0
Revert "add centralized fallback to append when merge is not supported"
jorritsandbrink Nov 27, 2024
54f1353
fall back to append if merge is not supported on filesystem
jorritsandbrink Nov 27, 2024
28d0fd2
fix test for s3-compatible storage
jorritsandbrink Nov 27, 2024
90b1729
remove obsolete code path
jorritsandbrink Nov 27, 2024
d0f7c88
exclude gcs read interface tests for iceberg
jorritsandbrink Nov 27, 2024
050bea7
add gcs support for iceberg table format
jorritsandbrink Nov 28, 2024
ff48ca9
switch to UnsupportedAuthenticationMethodException
jorritsandbrink Nov 28, 2024
01e8d26
add iceberg table format docs
jorritsandbrink Nov 28, 2024
ef29aa7
use shorter pipeline name to prevent too long sql identifiers
jorritsandbrink Nov 29, 2024
f463d06
add iceberg catalog note to docs
jorritsandbrink Nov 29, 2024
fcc05ee
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Nov 29, 2024
d50aaa1
black format
jorritsandbrink Nov 29, 2024
6cce03b
use shorter pipeline name to prevent too long sql identifiers
jorritsandbrink Nov 29, 2024
fc61663
correct max id length for sqlalchemy mysql dialect
jorritsandbrink Nov 29, 2024
b011907
Revert "use shorter pipeline name to prevent too long sql identifiers"
jorritsandbrink Nov 29, 2024
e748dcf
Revert "use shorter pipeline name to prevent too long sql identifiers"
jorritsandbrink Nov 29, 2024
1b47893
replace show with execute to prevent useless print output
jorritsandbrink Nov 29, 2024
133f1ce
add abfss scheme to test
jorritsandbrink Nov 30, 2024
eceb19f
remove az support for iceberg table format
jorritsandbrink Nov 30, 2024
e75114d
remove iceberg bucket test exclusion
jorritsandbrink Nov 30, 2024
049c008
add note to docs on azure scheme support for iceberg table format
jorritsandbrink Nov 30, 2024
a0fc017
exclude iceberg from duckdb s3-compatibility test
jorritsandbrink Dec 1, 2024
ba75445
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Dec 1, 2024
de0086e
disable pyiceberg info logs for tests
jorritsandbrink Dec 1, 2024
ca7f655
extend table format docs and move into own page
jorritsandbrink Dec 1, 2024
2ba8fcb
upgrade adlfs to enable account_host attribute
jorritsandbrink Dec 1, 2024
0517a95
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Dec 2, 2024
1c2b9b4
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Dec 2, 2024
872432e
fix lint errors
jorritsandbrink Dec 3, 2024
9c44290
re-add pyiceberg dependency
jorritsandbrink Dec 3, 2024
c129b9e
enabled iceberg in dbt-duckdb
rudolfix Dec 6, 2024
6992d56
upgrade pyiceberg version
jorritsandbrink Dec 10, 2024
156d518
Merge branch 'feat/1996-iceberg-filesystem' of https://github.com/dlt…
jorritsandbrink Dec 10, 2024
aa19f13
remove pyiceberg mypy errors across python version
jorritsandbrink Dec 10, 2024
c07c9f6
Merge branch 'devel' into feat/1996-iceberg-filesystem
rudolfix Dec 10, 2024
b7f6dbf
does not install airflow group for dev
rudolfix Dec 10, 2024
9cad3ec
fixes gcp oauth iceberg credentials handling
rudolfix Dec 10, 2024
346b270
fixes ca cert bundle duckdb azure on ci
rudolfix Dec 10, 2024
08f8ee3
Merge branch 'devel' into feat/1996-iceberg-filesystem
rudolfix Dec 11, 2024
accb62d
allow for airflow dep to be present during type check
rudolfix Dec 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,13 @@ jobs:
# key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-redshift

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E postgis -E postgres -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline,ibis -E deltalake
run: poetry install --no-interaction -E redshift -E postgis -E postgres -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline,ibis -E deltalake -E pyiceberg

- name: enable certificates for azure and duckdb
run: sudo mkdir -p /etc/pki/tls/certs && sudo ln -s /etc/ssl/certs/ca-certificates.crt /etc/pki/tls/certs/ca-bundle.crt

- name: Upgrade sqlalchemy
run: poetry run pip install sqlalchemy==2.0.18 # minimum version required by `pyiceberg`

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline,ibis -E deltalake
run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline,ibis -E deltalake -E pyiceberg

- name: Upgrade sqlalchemy
run: poetry run pip install sqlalchemy==2.0.18 # minimum version required by `pyiceberg`

- name: Start SFTP server
run: docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" up -d
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ has-poetry:
poetry --version

dev: has-poetry
poetry install --all-extras --with docs,providers,pipeline,sources,sentry-sdk,airflow
poetry install --all-extras --with docs,providers,pipeline,sources,sentry-sdk

lint:
./tools/check-package.sh
Expand Down
3 changes: 1 addition & 2 deletions dlt/cli/source_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ def find_call_arguments_to_replace(
if not isinstance(dn_node, ast.Constant) or not isinstance(dn_node.value, str):
raise CliCommandInnerException(
"init",
f"The pipeline script {init_script_name} must pass the {t_arg_name} as"
f" string to '{arg_name}' function in line {dn_node.lineno}",
f"The pipeline script {init_script_name} must pass the {t_arg_name} as string to '{arg_name}' function in line {dn_node.lineno}", # type: ignore[attr-defined]
)
else:
transformed_nodes.append((dn_node, ast.Constant(value=t_value, kind=None)))
Expand Down
15 changes: 14 additions & 1 deletion dlt/common/configuration/specs/aws_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
CredentialsWithDefault,
configspec,
)
from dlt.common.configuration.specs.mixins import WithObjectStoreRsCredentials, WithPyicebergConfig
from dlt.common.configuration.specs.exceptions import (
InvalidBoto3Session,
ObjectStoreRsCredentialsException,
Expand All @@ -16,7 +17,9 @@


@configspec
class AwsCredentialsWithoutDefaults(CredentialsConfiguration):
class AwsCredentialsWithoutDefaults(
CredentialsConfiguration, WithObjectStoreRsCredentials, WithPyicebergConfig
):
# credentials without boto implementation
aws_access_key_id: str = None
aws_secret_access_key: TSecretStrValue = None
Expand Down Expand Up @@ -77,6 +80,16 @@ def to_object_store_rs_credentials(self) -> Dict[str, str]:

return creds

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
return {
"s3.access-key-id": self.aws_access_key_id,
"s3.secret-access-key": self.aws_secret_access_key,
"s3.session-token": self.aws_session_token,
"s3.region": self.region_name,
"s3.endpoint": self.endpoint_url,
"s3.connect-timeout": 300,
jorritsandbrink marked this conversation as resolved.
Show resolved Hide resolved
}


@configspec
class AwsCredentials(AwsCredentialsWithoutDefaults, CredentialsWithDefault):
Expand Down
22 changes: 19 additions & 3 deletions dlt/common/configuration/specs/azure_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
CredentialsWithDefault,
configspec,
)
from dlt.common.configuration.specs.mixins import WithObjectStoreRsCredentials, WithPyicebergConfig
from dlt import version
from dlt.common.utils import without_none

_AZURE_STORAGE_EXTRA = f"{version.DLT_PKG_NAME}[az]"


@configspec
class AzureCredentialsBase(CredentialsConfiguration):
class AzureCredentialsBase(CredentialsConfiguration, WithObjectStoreRsCredentials):
azure_storage_account_name: str = None
azure_account_host: Optional[str] = None
"""Alternative host when accessing blob storage endpoint ie. my_account.dfs.core.windows.net"""
Expand All @@ -32,7 +33,7 @@ def to_object_store_rs_credentials(self) -> Dict[str, str]:


@configspec
class AzureCredentialsWithoutDefaults(AzureCredentialsBase):
class AzureCredentialsWithoutDefaults(AzureCredentialsBase, WithPyicebergConfig):
"""Credentials for Azure Blob Storage, compatible with adlfs"""

azure_storage_account_key: Optional[TSecretStrValue] = None
Expand All @@ -49,6 +50,13 @@ def to_adlfs_credentials(self) -> Dict[str, Any]:
account_host=self.azure_account_host,
)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
return {
"adlfs.account-name": self.azure_storage_account_name,
"adlfs.account-key": self.azure_storage_account_key,
"adlfs.sas-token": self.azure_storage_sas_token,
}

def create_sas_token(self) -> None:
try:
from azure.storage.blob import generate_account_sas, ResourceTypes
Expand All @@ -72,7 +80,7 @@ def on_partial(self) -> None:


@configspec
class AzureServicePrincipalCredentialsWithoutDefaults(AzureCredentialsBase):
class AzureServicePrincipalCredentialsWithoutDefaults(AzureCredentialsBase, WithPyicebergConfig):
azure_tenant_id: str = None
azure_client_id: str = None
azure_client_secret: TSecretStrValue = None
Expand All @@ -86,6 +94,14 @@ def to_adlfs_credentials(self) -> Dict[str, Any]:
client_secret=self.azure_client_secret,
)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
return {
"adlfs.account-name": self.azure_storage_account_name,
"adlfs.tenant-id": self.azure_tenant_id,
"adlfs.client-id": self.azure_client_id,
"adlfs.client-secret": self.azure_client_secret,
}


@configspec
class AzureCredentials(AzureCredentialsWithoutDefaults, CredentialsWithDefault):
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/configuration/specs/base_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def _get_resolvable_dataclass_fields(cls) -> Iterator[TDtcField]:
def get_resolvable_fields(cls) -> Dict[str, type]:
"""Returns a mapping of fields to their type hints. Dunders should not be resolved and are not returned"""
return {
f.name: eval(f.type) if isinstance(f.type, str) else f.type # type: ignore[arg-type]
f.name: eval(f.type) if isinstance(f.type, str) else f.type
for f in cls._get_resolvable_dataclass_fields()
}

Expand Down
7 changes: 1 addition & 6 deletions dlt/common/configuration/specs/config_providers_context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import contextlib
import dataclasses
import io
from typing import ClassVar, List

Expand All @@ -8,10 +7,6 @@
ConfigProvider,
ContextProvider,
)
from dlt.common.configuration.specs.base_configuration import (
ContainerInjectableContext,
NotResolved,
)
from dlt.common.configuration.specs import (
GcpServiceAccountCredentials,
BaseConfiguration,
Expand Down Expand Up @@ -137,7 +132,7 @@ def _airflow_providers() -> List[ConfigProvider]:
# check if we are in task context and provide more info
from airflow.operators.python import get_current_context # noqa

ti: TaskInstance = get_current_context()["ti"] # type: ignore
ti: TaskInstance = get_current_context()["ti"] # type: ignore[assignment,unused-ignore]

# log outside of stderr/out redirect
if secrets_toml_var is None:
Expand Down
4 changes: 4 additions & 0 deletions dlt/common/configuration/specs/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ def __init__(self, spec: Type[Any], native_value: Any):

class ObjectStoreRsCredentialsException(ConfigurationException):
pass


class UnsupportedAuthenticationMethodException(ConfigurationException):
pass
36 changes: 32 additions & 4 deletions dlt/common/configuration/specs/gcp_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
InvalidGoogleServicesJson,
NativeValueError,
OAuth2ScopesRequired,
UnsupportedAuthenticationMethodException,
)
from dlt.common.configuration.specs.mixins import WithObjectStoreRsCredentials, WithPyicebergConfig
from dlt.common.exceptions import MissingDependencyException
from dlt.common.typing import DictStrAny, TSecretStrValue, StrAny
from dlt.common.configuration.specs.base_configuration import (
Expand All @@ -23,7 +25,7 @@


@configspec
class GcpCredentials(CredentialsConfiguration):
class GcpCredentials(CredentialsConfiguration, WithObjectStoreRsCredentials, WithPyicebergConfig):
token_uri: Final[str] = dataclasses.field(
default="https://oauth2.googleapis.com/token", init=False, repr=False, compare=False
)
Expand Down Expand Up @@ -126,6 +128,12 @@ def to_native_credentials(self) -> Any:
else:
return ServiceAccountCredentials.from_service_account_info(self)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
raise UnsupportedAuthenticationMethodException(
"Service Account authentication not supported with `iceberg` table format. Use OAuth"
" authentication instead."
)

def __str__(self) -> str:
return f"{self.client_email}@{self.project_id}"

Expand Down Expand Up @@ -176,11 +184,19 @@ def to_native_representation(self) -> str:
return json.dumps(self._info_dict())

def to_object_store_rs_credentials(self) -> Dict[str, str]:
raise NotImplementedError(
"`object_store` Rust crate does not support OAuth for GCP credentials. Reference:"
" https://docs.rs/object_store/latest/object_store/gcp."
raise UnsupportedAuthenticationMethodException(
"OAuth authentication not supported with `delta` table format. Use Service Account or"
" Application Default Credentials authentication instead."
)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
self.auth()
return {
"gcs.project-id": self.project_id,
"gcs.oauth2.token": self.token,
"gcs.oauth2.token-expires-at": (pendulum.now().timestamp() + 60) * 1000,
}

def auth(self, scopes: Union[str, List[str]] = None, redirect_url: str = None) -> None:
if not self.refresh_token:
self.add_scopes(scopes)
Expand Down Expand Up @@ -313,6 +329,12 @@ def to_native_credentials(self) -> Any:
else:
return super().to_native_credentials()

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
raise UnsupportedAuthenticationMethodException(
"Application Default Credentials authentication not supported with `iceberg` table"
" format. Use OAuth authentication instead."
)


@configspec
class GcpServiceAccountCredentials(
Expand All @@ -334,3 +356,9 @@ def parse_native_representation(self, native_value: Any) -> None:
except NativeValueError:
pass
GcpOAuthCredentialsWithoutDefaults.parse_native_representation(self, native_value)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
if self.has_default_credentials():
return GcpDefaultCredentials.to_pyiceberg_fileio_config(self)
else:
return GcpOAuthCredentialsWithoutDefaults.to_pyiceberg_fileio_config(self)
24 changes: 24 additions & 0 deletions dlt/common/configuration/specs/mixins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import Dict, Any
from abc import abstractmethod, ABC


class WithObjectStoreRsCredentials(ABC):
@abstractmethod
def to_object_store_rs_credentials(self) -> Dict[str, Any]:
"""Returns credentials dictionary for object_store Rust crate.

Can be used for libraries that build on top of the object_store crate, such as `deltalake`.

https://docs.rs/object_store/latest/object_store/
"""
pass


class WithPyicebergConfig(ABC):
@abstractmethod
def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
"""Returns `pyiceberg` FileIO configuration dictionary.

https://py.iceberg.apache.org/configuration/#fileio
"""
pass
2 changes: 1 addition & 1 deletion dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def _flush_items(self, allow_empty_file: bool = False) -> None:
if self.writer_spec.is_binary_format:
self._file = self.open(self._file_name, "wb") # type: ignore
else:
self._file = self.open(self._file_name, "wt", encoding="utf-8", newline="") # type: ignore
self._file = self.open(self._file_name, "wt", encoding="utf-8", newline="")
self._writer = self.writer_cls(self._file, caps=self._caps) # type: ignore[assignment]
self._writer.write_header(self._current_columns)
# write buffer
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/destination/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def verify_schema_capabilities(
exception_log: List[Exception] = []
# combined casing function
case_identifier = lambda ident: capabilities.casefold_identifier(
(str if capabilities.has_case_sensitive_identifiers else str.casefold)(ident) # type: ignore
(str if capabilities.has_case_sensitive_identifiers else str.casefold)(ident)
)
table_name_lookup: DictStrStr = {}
# name collision explanation
Expand Down
6 changes: 3 additions & 3 deletions dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dlt.common.exceptions import MissingDependencyException
from dlt.common.storages import FilesystemConfiguration
from dlt.common.utils import assert_min_pkg_version
from dlt.common.configuration.specs.mixins import WithObjectStoreRsCredentials
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient

try:
Expand Down Expand Up @@ -191,10 +192,9 @@ def get_delta_tables(

def _deltalake_storage_options(config: FilesystemConfiguration) -> Dict[str, str]:
"""Returns dict that can be passed as `storage_options` in `deltalake` library."""
creds = {} # type: ignore
creds = {}
extra_options = {}
# TODO: create a mixin with to_object_store_rs_credentials for a proper discovery
if hasattr(config.credentials, "to_object_store_rs_credentials"):
if isinstance(config.credentials, WithObjectStoreRsCredentials):
creds = config.credentials.to_object_store_rs_credentials()
if config.deltalake_storage_options is not None:
extra_options = config.deltalake_storage_options
Expand Down
Loading
Loading