Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/table iceberg #13

Closed
wants to merge 14 commits into from
17 changes: 17 additions & 0 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import boto3
from botocore.exceptions import ClientError
from itertools import chain
from os import path
from threading import Lock
from typing import Dict, Iterator, List, Optional, Set
from uuid import uuid4
Expand Down Expand Up @@ -53,6 +54,22 @@ def s3_uuid_table_location(self):

return f"{client.s3_staging_dir}tables/{str(uuid4())}/"

@available
def get_unique_external_location(self, external_location, strict_location, staging_dir, relation_name):
"""
Generate a unique not overlapping location.
"""
unique_id = str(uuid4())
if external_location is not None:
if not strict_location:
if external_location.endswith('/'):
external_location = external_location[:-1]
external_location = f'{external_location}_{unique_id}/'
else:
base_path = path.join(staging_dir, f'{relation_name}_{unique_id}')
external_location = f'{base_path}/'
return external_location

@available
def clean_up_partitions(
self, database_name: str, table_name: str, where_condition: str
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
{% macro drop_iceberg(relation) -%}
drop table if exists {{ relation }}
{% endmacro %}

{% macro create_table_iceberg(relation, old_relation, tmp_relation, sql) -%}
{%- set external_location = config.get('external_location', default=none) -%}
{%- set staging_location = config.get('staging_location') -%}
{%- set partitioned_by = config.get('partitioned_by', default=none) -%}
{%- set bucketed_by = config.get('bucketed_by', default=none) -%}
{%- set bucket_count = config.get('bucket_count', default=none) -%}
{%- set write_compression = config.get('write_compression', default=none) -%}

{%- set target_relation = this.incorporate(type='table') -%}

{% if tmp_relation is not none %}
{% do adapter.drop_relation(tmp_relation) %}
{% endif %}

-- create tmp table
{% do run_query(create_tmp_table_iceberg(tmp_relation, sql, staging_location)) %}

-- get columns from tmp table to retrieve metadata
{%- set dest_columns = adapter.get_columns_in_relation(tmp_relation) -%}

-- drop old relation after tmp table is ready
{%- if old_relation is not none -%}
{% do run_query(drop_iceberg(old_relation)) %}
{%- endif -%}

-- create iceberg table
{% do run_query(create_iceberg_table_definition(target_relation, dest_columns)) %}

-- return final insert statement
{{ return(incremental_insert(tmp_relation, target_relation)) }}

{% endmacro %}


{% macro create_tmp_table_iceberg(relation, sql, staging_location) -%}
create table
{{ relation }}

with (
write_compression='snappy',
format='parquet'
)
as
{{ sql }}
{% endmacro %}

{% macro create_iceberg_table_definition(relation, dest_columns) -%}
{%- set external_location = config.get('external_location', default=none) -%}
{%- set strict_location = config.get('strict_location', default=true) -%}
{%- set partitioned_by = config.get('partitioned_by', default=none) -%}
{%- set table_properties = config.get('table_properties', default={}) -%}
{%- set _ = table_properties.update({'table_type': 'ICEBERG'}) -%}
{%- set table_properties_formatted = [] -%}
{%- set dest_columns_with_type = [] -%}

{%- for k in table_properties -%}
{% set _ = table_properties_formatted.append("'" + k + "'='" + table_properties[k] + "'") -%}
{%- endfor -%}

{%- set table_properties_csv= table_properties_formatted | join(', ') -%}

{%- set external_location = adapter.get_unique_external_location(external_location, strict_location, target.s3_staging_dir, relation.name) -%}

{%- for col in dest_columns -%}
{% set dtype = iceberg_data_type(col.dtype) -%}
{% set _ = dest_columns_with_type.append(col.name + ' ' + dtype) -%}
{%- endfor -%}

{%- set dest_columns_with_type_csv = dest_columns_with_type | join(', ') -%}

CREATE TABLE {{ relation }} (
{{ dest_columns_with_type_csv }}
)
{%- if partitioned_by is not none %}
{%- set partitioned_by_csv = partitioned_by | join(', ') -%}
PARTITIONED BY ({{partitioned_by_csv}})
{%- endif %}
LOCATION '{{ external_location }}'
TBLPROPERTIES (
{{table_properties_csv}}
)

{% endmacro %}

{% macro iceberg_data_type(col_type) -%}
{%- if 'varchar' in col_type -%}
{% set data_type = 'string' -%}
{%- elif 'integer' == col_type -%}
{% set data_type = 'int' -%}
{%- else -%}
{% set data_type = col_type -%}
{%- endif -%}

{{ return(data_type) }}
{% endmacro %}
29 changes: 23 additions & 6 deletions dbt/include/athena/macros/materializations/models/table/table.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{% materialization table, adapter='athena' -%}
{%- set identifier = model['alias'] -%}

{%- set format = config.get('format', default='parquet') -%}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
Expand All @@ -9,17 +9,34 @@

{{ run_hooks(pre_hooks) }}

-- drop old relation if it is not iceberg
{%- if old_relation is not none -%}
{{ adapter.drop_relation(old_relation) }}
{%- if format != 'iceberg' -%}
{{ adapter.drop_relation(old_relation) }}
{%- endif -%}
{%- endif -%}

-- build model
{% call statement('main') -%}
{{ create_table_as(False, target_relation, sql) }}
{% endcall -%}
{%- if format == 'iceberg' -%}
{%- set tmp_relation = make_temp_relation(target_relation) -%}
{%- set build_sql = create_table_iceberg(target_relation, old_relation, tmp_relation, sql) -%}
{% else %}
{% set build_sql = create_table_as(False, target_relation, sql) -%}
{%- endif -%}

{% call statement("main") %}
{{ build_sql }}
{% endcall %}

-- drop tmp table in case of iceberg
{%- if format == 'iceberg' -%}
{% do adapter.drop_relation(tmp_relation) %}
{%- endif -%}

-- set table properties
{{ set_table_classification(target_relation, 'parquet') }}
{%- if format != 'iceberg' -%}
{{ set_table_classification(target_relation, format) }}
{%- endif -%}

{{ run_hooks(post_hooks) }}

Expand Down