Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iceberg table format support for filesystem destination #2067

Merged
merged 79 commits into from
Dec 11, 2024
Merged
Changes from 1 commit
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
79c018c
add pyiceberg dependency and upgrade mypy
jorritsandbrink Nov 14, 2024
5014f88
extend pyiceberg dependencies
jorritsandbrink Nov 15, 2024
c632dd7
remove redundant delta annotation
jorritsandbrink Nov 15, 2024
a3f6587
add basic local filesystem iceberg support
jorritsandbrink Nov 15, 2024
87553a6
add active table format setting
jorritsandbrink Nov 15, 2024
513662e
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Nov 15, 2024
10121be
disable merge tests for iceberg table format
jorritsandbrink Nov 15, 2024
23c4db3
restore non-redundant extra info
jorritsandbrink Nov 15, 2024
195ee4c
refactor to in-memory iceberg catalog
jorritsandbrink Nov 15, 2024
ee6e22e
add s3 support for iceberg table format
jorritsandbrink Nov 15, 2024
bc51008
add schema evolution support for iceberg table format
jorritsandbrink Nov 16, 2024
2de58a2
extract _register_table function
jorritsandbrink Nov 16, 2024
dd4ad0f
add partition support for iceberg table format
jorritsandbrink Nov 20, 2024
04be59b
update docstring
jorritsandbrink Nov 20, 2024
42f59c7
enable child table test for iceberg table format
jorritsandbrink Nov 21, 2024
a540135
enable empty source test for iceberg table format
jorritsandbrink Nov 21, 2024
3d1dc63
make iceberg catalog namespace configurable and default to dataset name
jorritsandbrink Nov 22, 2024
59e6d08
add optional typing
jorritsandbrink Nov 22, 2024
71e436d
fix typo
jorritsandbrink Nov 24, 2024
2effa8f
improve typing
jorritsandbrink Nov 24, 2024
8979ee1
extract logic into dedicated function
jorritsandbrink Nov 24, 2024
e956b09
add iceberg read support to filesystem sql client
jorritsandbrink Nov 24, 2024
571bf0c
remove unused import
jorritsandbrink Nov 24, 2024
0ec5fcb
add todo
jorritsandbrink Nov 24, 2024
ab0b9a0
extract logic into separate functions
jorritsandbrink Nov 24, 2024
e149ba6
add azure support for iceberg table format
jorritsandbrink Nov 24, 2024
27b8659
generalize delta table format tests
jorritsandbrink Nov 24, 2024
d39d58d
enable get tables function test for iceberg table format
jorritsandbrink Nov 24, 2024
2f910c2
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Nov 24, 2024
547a37a
remove ignores
jorritsandbrink Nov 25, 2024
53b2d56
undo table directory management change
jorritsandbrink Nov 25, 2024
3ff9fb7
enable test_read_interfaces tests for iceberg
jorritsandbrink Nov 25, 2024
5b0cd17
fix active table format filter
jorritsandbrink Nov 25, 2024
8798d79
use mixin for object store rs credentials
jorritsandbrink Nov 25, 2024
c1cc068
generalize catalog typing
jorritsandbrink Nov 25, 2024
35f590b
extract pyiceberg scheme mapping into separate function
jorritsandbrink Nov 26, 2024
e0d6a1b
generalize credentials mixin test setup
jorritsandbrink Nov 26, 2024
85a10a2
remove unused import
jorritsandbrink Nov 26, 2024
54cd0bc
add centralized fallback to append when merge is not supported
jorritsandbrink Nov 26, 2024
4e979f0
Revert "add centralized fallback to append when merge is not supported"
jorritsandbrink Nov 27, 2024
54f1353
fall back to append if merge is not supported on filesystem
jorritsandbrink Nov 27, 2024
28d0fd2
fix test for s3-compatible storage
jorritsandbrink Nov 27, 2024
90b1729
remove obsolete code path
jorritsandbrink Nov 27, 2024
d0f7c88
exclude gcs read interface tests for iceberg
jorritsandbrink Nov 27, 2024
050bea7
add gcs support for iceberg table format
jorritsandbrink Nov 28, 2024
ff48ca9
switch to UnsupportedAuthenticationMethodException
jorritsandbrink Nov 28, 2024
01e8d26
add iceberg table format docs
jorritsandbrink Nov 28, 2024
ef29aa7
use shorter pipeline name to prevent too long sql identifiers
jorritsandbrink Nov 29, 2024
f463d06
add iceberg catalog note to docs
jorritsandbrink Nov 29, 2024
fcc05ee
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Nov 29, 2024
d50aaa1
black format
jorritsandbrink Nov 29, 2024
6cce03b
use shorter pipeline name to prevent too long sql identifiers
jorritsandbrink Nov 29, 2024
fc61663
correct max id length for sqlalchemy mysql dialect
jorritsandbrink Nov 29, 2024
b011907
Revert "use shorter pipeline name to prevent too long sql identifiers"
jorritsandbrink Nov 29, 2024
e748dcf
Revert "use shorter pipeline name to prevent too long sql identifiers"
jorritsandbrink Nov 29, 2024
1b47893
replace show with execute to prevent useless print output
jorritsandbrink Nov 29, 2024
133f1ce
add abfss scheme to test
jorritsandbrink Nov 30, 2024
eceb19f
remove az support for iceberg table format
jorritsandbrink Nov 30, 2024
e75114d
remove iceberg bucket test exclusion
jorritsandbrink Nov 30, 2024
049c008
add note to docs on azure scheme support for iceberg table format
jorritsandbrink Nov 30, 2024
a0fc017
exclude iceberg from duckdb s3-compatibility test
jorritsandbrink Dec 1, 2024
ba75445
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Dec 1, 2024
de0086e
disable pyiceberg info logs for tests
jorritsandbrink Dec 1, 2024
ca7f655
extend table format docs and move into own page
jorritsandbrink Dec 1, 2024
2ba8fcb
upgrade adlfs to enable account_host attribute
jorritsandbrink Dec 1, 2024
0517a95
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Dec 2, 2024
1c2b9b4
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
jorritsandbrink Dec 2, 2024
872432e
fix lint errors
jorritsandbrink Dec 3, 2024
9c44290
re-add pyiceberg dependency
jorritsandbrink Dec 3, 2024
c129b9e
enabled iceberg in dbt-duckdb
rudolfix Dec 6, 2024
6992d56
upgrade pyiceberg version
jorritsandbrink Dec 10, 2024
156d518
Merge branch 'feat/1996-iceberg-filesystem' of https://github.com/dlt…
jorritsandbrink Dec 10, 2024
aa19f13
remove pyiceberg mypy errors across python version
jorritsandbrink Dec 10, 2024
c07c9f6
Merge branch 'devel' into feat/1996-iceberg-filesystem
rudolfix Dec 10, 2024
b7f6dbf
does not install airflow group for dev
rudolfix Dec 10, 2024
9cad3ec
fixes gcp oauth iceberg credentials handling
rudolfix Dec 10, 2024
346b270
fixes ca cert bundle duckdb azure on ci
rudolfix Dec 10, 2024
08f8ee3
Merge branch 'devel' into feat/1996-iceberg-filesystem
rudolfix Dec 11, 2024
accb62d
allow for airflow dep to be present during type check
rudolfix Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
extract logic into separate functions
jorritsandbrink committed Nov 24, 2024
commit ab0b9a0c070f07a188ee0b810d030c0757117397
56 changes: 41 additions & 15 deletions dlt/common/libs/pyiceberg.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.utils import assert_min_pkg_version
from dlt.common.exceptions import MissingDependencyException
from dlt.common.storages.configuration import FileSystemCredentials
from dlt.common.configuration.specs import CredentialsConfiguration
from dlt.common.configuration.specs.mixins import WithPyicebergConfig
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient
@@ -51,27 +52,22 @@ def write_iceberg_table(
table.overwrite(ensure_iceberg_compatible_arrow_data(data))


def get_catalog(
def get_sql_catalog(credentials: FileSystemCredentials) -> SqlCatalog:
return SqlCatalog(
"default",
uri="sqlite:///:memory:",
**_get_fileio_config(credentials),
)


def create_or_evolve_table(
catalog: SqlCatalog,
client: FilesystemClient,
table_name: str,
namespace_name: Optional[str] = None,
schema: Optional[pa.Schema] = None,
partition_columns: Optional[List[str]] = None,
) -> SqlCatalog:
"""Returns single-table, ephemeral, in-memory Iceberg catalog."""

# create in-memory catalog
catalog = SqlCatalog(
"default",
uri="sqlite:///:memory:",
**_get_fileio_config(client.config.credentials),
)

# create namespace
if namespace_name is None:
namespace_name = client.dataset_name
catalog.create_namespace(namespace_name)

# add table to catalog
table_id = f"{namespace_name}.{table_name}"
table_path = f"{client.dataset_path}/{table_name}"
@@ -100,6 +96,36 @@ def get_catalog(
return catalog


def get_catalog(
client: FilesystemClient,
table_name: str,
namespace_name: Optional[str] = None,
schema: Optional[pa.Schema] = None,
partition_columns: Optional[List[str]] = None,
) -> SqlCatalog:
"""Returns single-table, ephemeral, in-memory Iceberg catalog."""

# create in-memory catalog
catalog = get_sql_catalog(client.config.credentials)

# create namespace
if namespace_name is None:
namespace_name = client.dataset_name
catalog.create_namespace(namespace_name)

# add table to catalog
catalog = create_or_evolve_table(
catalog=catalog,
client=client,
table_name=table_name,
namespace_name=namespace_name,
schema=schema,
partition_columns=partition_columns,
)

return catalog


def get_iceberg_tables(
jorritsandbrink marked this conversation as resolved.
Show resolved Hide resolved
pipeline: Pipeline, *tables: str, schema_name: Optional[str] = None
) -> Dict[str, IcebergTable]: