diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index 8e6dcb7b..4296554e 100755 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -3,6 +3,7 @@ import boto3 from botocore.exceptions import ClientError from itertools import chain +from os import path from threading import Lock from typing import Dict, Iterator, List, Optional, Set from uuid import uuid4 @@ -53,6 +54,22 @@ def s3_uuid_table_location(self): return f"{client.s3_staging_dir}tables/{str(uuid4())}/" + @available + def get_unique_external_location(self, external_location, strict_location, staging_dir, relation_name): + """ + Generate a unique not overlapping location. + """ + unique_id = str(uuid4()) + if external_location is not None: + if not strict_location: + if external_location.endswith('/'): + external_location = external_location[:-1] + external_location = f'{external_location}_{unique_id}/' + else: + base_path = path.join(staging_dir, f'{relation_name}_{unique_id}') + external_location = f'{base_path}/' + return external_location + @available def clean_up_partitions( self, database_name: str, table_name: str, where_condition: str diff --git a/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql b/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql new file mode 100644 index 00000000..9aeb549c --- /dev/null +++ b/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql @@ -0,0 +1,99 @@ +{% macro drop_iceberg(relation) -%} + drop table if exists {{ relation }} +{% endmacro %} + +{% macro create_table_iceberg(relation, old_relation, tmp_relation, sql) -%} + {%- set external_location = config.get('external_location', default=none) -%} + {%- set staging_location = config.get('staging_location') -%} + {%- set partitioned_by = config.get('partitioned_by', default=none) -%} + {%- set bucketed_by = config.get('bucketed_by', default=none) -%} + {%- set bucket_count = config.get('bucket_count', default=none) -%} + {%- set write_compression = config.get('write_compression', default=none) -%} + + {%- set target_relation = this.incorporate(type='table') -%} + + {% if tmp_relation is not none %} + {% do adapter.drop_relation(tmp_relation) %} + {% endif %} + + -- create tmp table + {% do run_query(create_tmp_table_iceberg(tmp_relation, sql, staging_location)) %} + + -- get columns from tmp table to retrieve metadata + {%- set dest_columns = adapter.get_columns_in_relation(tmp_relation) -%} + + -- drop old relation after tmp table is ready + {%- if old_relation is not none -%} + {% do run_query(drop_iceberg(old_relation)) %} + {%- endif -%} + + -- create iceberg table + {% do run_query(create_iceberg_table_definition(target_relation, dest_columns)) %} + + -- return final insert statement + {{ return(incremental_insert(tmp_relation, target_relation)) }} + +{% endmacro %} + + +{% macro create_tmp_table_iceberg(relation, sql, staging_location) -%} + create table + {{ relation }} + + with ( + write_compression='snappy', + format='parquet' + ) + as + {{ sql }} +{% endmacro %} + +{% macro create_iceberg_table_definition(relation, dest_columns) -%} + {%- set external_location = config.get('external_location', default=none) -%} + {%- set strict_location = config.get('strict_location', default=true) -%} + {%- set partitioned_by = config.get('partitioned_by', default=none) -%} + {%- set table_properties = config.get('table_properties', default={}) -%} + {%- set _ = table_properties.update({'table_type': 'ICEBERG'}) -%} + {%- set table_properties_formatted = [] -%} + {%- set dest_columns_with_type = [] -%} + + {%- for k in table_properties -%} + {% set _ = table_properties_formatted.append("'" + k + "'='" + table_properties[k] + "'") -%} + {%- endfor -%} + + {%- set table_properties_csv= table_properties_formatted | join(', ') -%} + + {%- set external_location = adapter.get_unique_external_location(external_location, strict_location, target.s3_staging_dir, relation.name) -%} + + {%- for col in dest_columns -%} + {% set dtype = iceberg_data_type(col.dtype) -%} + {% set _ = dest_columns_with_type.append(col.name + ' ' + dtype) -%} + {%- endfor -%} + + {%- set dest_columns_with_type_csv = dest_columns_with_type | join(', ') -%} + + CREATE TABLE {{ relation }} ( + {{ dest_columns_with_type_csv }} + ) + {%- if partitioned_by is not none %} + {%- set partitioned_by_csv = partitioned_by | join(', ') -%} + PARTITIONED BY ({{partitioned_by_csv}}) + {%- endif %} + LOCATION '{{ external_location }}' + TBLPROPERTIES ( + {{table_properties_csv}} + ) + +{% endmacro %} + +{% macro iceberg_data_type(col_type) -%} + {%- if 'varchar' in col_type -%} + {% set data_type = 'string' -%} + {%- elif 'integer' == col_type -%} + {% set data_type = 'int' -%} + {%- else -%} + {% set data_type = col_type -%} + {%- endif -%} + + {{ return(data_type) }} +{% endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/table/table.sql b/dbt/include/athena/macros/materializations/models/table/table.sql index ad425f00..2c8bd9f0 100644 --- a/dbt/include/athena/macros/materializations/models/table/table.sql +++ b/dbt/include/athena/macros/materializations/models/table/table.sql @@ -1,6 +1,6 @@ {% materialization table, adapter='athena' -%} {%- set identifier = model['alias'] -%} - + {%- set format = config.get('format', default='parquet') -%} {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} {%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, @@ -9,17 +9,34 @@ {{ run_hooks(pre_hooks) }} + -- drop old relation if it is not iceberg {%- if old_relation is not none -%} - {{ adapter.drop_relation(old_relation) }} + {%- if format != 'iceberg' -%} + {{ adapter.drop_relation(old_relation) }} + {%- endif -%} {%- endif -%} -- build model - {% call statement('main') -%} - {{ create_table_as(False, target_relation, sql) }} - {% endcall -%} + {%- if format == 'iceberg' -%} + {%- set tmp_relation = make_temp_relation(target_relation) -%} + {%- set build_sql = create_table_iceberg(target_relation, old_relation, tmp_relation, sql) -%} + {% else %} + {% set build_sql = create_table_as(False, target_relation, sql) -%} + {%- endif -%} + + {% call statement("main") %} + {{ build_sql }} + {% endcall %} + + -- drop tmp table in case of iceberg + {%- if format == 'iceberg' -%} + {% do adapter.drop_relation(tmp_relation) %} + {%- endif -%} -- set table properties - {{ set_table_classification(target_relation, 'parquet') }} + {%- if format != 'iceberg' -%} + {{ set_table_classification(target_relation, format) }} + {%- endif -%} {{ run_hooks(post_hooks) }}