diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 00000000..96f39441 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +recursive-include dbt/include *.sql *.yml diff --git a/README.md b/README.md index 012b3307..42f32459 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # dbt-athena -* Supports dbt version `0.21.0` +* Supports dbt version `1.0.*` * Supports [Seeds][seeds] * Correctly detects views and their columns * Support [incremental models][incremental] diff --git a/dbt/adapters/athena/__version__.py b/dbt/adapters/athena/__version__.py index 025ca235..86774adc 100644 --- a/dbt/adapters/athena/__version__.py +++ b/dbt/adapters/athena/__version__.py @@ -1 +1 @@ -version = "0.21.0" +version = "1.0.1" diff --git a/dbt/adapters/athena/connections.py b/dbt/adapters/athena/connections.py index 9b6952f1..cc30862c 100644 --- a/dbt/adapters/athena/connections.py +++ b/dbt/adapters/athena/connections.py @@ -20,12 +20,15 @@ from dbt.contracts.connection import Connection, AdapterResponse from dbt.adapters.sql import SQLConnectionManager from dbt.exceptions import RuntimeException, FailedToConnectException -from dbt.logger import GLOBAL_LOGGER as logger +from dbt.events import AdapterLogger + import tenacity from tenacity.retry import retry_if_exception from tenacity.stop import stop_after_attempt from tenacity.wait import wait_exponential +logger = AdapterLogger("Athena") + @dataclass class AthenaCredentials(Credentials): diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index c0009ab3..a8f5fbc2 100644 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -3,12 +3,14 @@ import re import boto3.session from botocore.exceptions import ClientError +from typing import Optional from dbt.adapters.base import available from dbt.adapters.sql import SQLAdapter from dbt.adapters.athena import AthenaConnectionManager from dbt.adapters.athena.relation import AthenaRelation -from dbt.logger import GLOBAL_LOGGER as logger +from dbt.events import AdapterLogger +logger = AdapterLogger("Athena") class AthenaAdapter(SQLAdapter): ConnectionManager = AthenaConnectionManager @@ -99,3 +101,9 @@ def clean_up_table( s3_resource = session.resource('s3') s3_bucket = s3_resource.Bucket(bucket_name) s3_bucket.objects.filter(Prefix=prefix).delete() + + @available + def quote_seed_column( + self, column: str, quote_config: Optional[bool] + ) -> str: + return super().quote_seed_column(column, False) diff --git a/dbt/include/athena/macros/adapters.sql b/dbt/include/athena/macros/adapters.sql deleted file mode 100644 index a02bb339..00000000 --- a/dbt/include/athena/macros/adapters.sql +++ /dev/null @@ -1,125 +0,0 @@ -{% macro set_table_classification(relation, default_value) -%} - {%- set format = config.get('format', default=default_value) -%} - - {% call statement('set_table_classification', auto_begin=False) -%} - alter table {{ relation }} set tblproperties ('classification' = '{{ format }}') - {%- endcall %} -{%- endmacro %} - -{% macro athena__create_table_as(temporary, relation, sql) -%} - {%- set external_location = config.get('external_location', default=none) -%} - {%- set partitioned_by = config.get('partitioned_by', default=none) -%} - {%- set bucketed_by = config.get('bucketed_by', default=none) -%} - {%- set bucket_count = config.get('bucket_count', default=none) -%} - {%- set field_delimiter = config.get('field_delimiter', default=none) -%} - {%- set format = config.get('format', default='parquet') -%} - - create table - {{ relation }} - - with ( - {%- if external_location is not none and not temporary %} - external_location='{{ external_location }}', - {%- endif %} - {%- if partitioned_by is not none %} - partitioned_by=ARRAY{{ partitioned_by | tojson | replace('\"', '\'') }}, - {%- endif %} - {%- if bucketed_by is not none %} - bucketed_by=ARRAY{{ bucketed_by | tojson | replace('\"', '\'') }}, - {%- endif %} - {%- if bucket_count is not none %} - bucket_count={{ bucket_count }}, - {%- endif %} - {%- if field_delimiter is not none %} - field_delimiter='{{ field_delimiter }}', - {%- endif %} - format='{{ format }}' - ) - as - {{ sql }} -{% endmacro %} - -{% macro athena__create_view_as(relation, sql) -%} - create or replace view - {{ relation }} - as - {{ sql }} -{% endmacro %} - -{% macro athena__list_schemas(database) -%} - {% call statement('list_schemas', fetch_result=True) %} - select - distinct schema_name - - from {{ information_schema_name(database) }}.schemata - {% endcall %} - {{ return(load_result('list_schemas').table) }} -{% endmacro %} - -{% macro athena__list_relations_without_caching(schema_relation) %} - {% call statement('list_relations_without_caching', fetch_result=True) -%} - WITH views AS ( - select - table_catalog as database, - table_name as name, - table_schema as schema - from {{ schema_relation.information_schema() }}.views - where LOWER(table_schema) = LOWER('{{ schema_relation.schema }}') - ), tables AS ( - select - table_catalog as database, - table_name as name, - table_schema as schema - - from {{ schema_relation.information_schema() }}.tables - where LOWER(table_schema) = LOWER('{{ schema_relation.schema }}') - - -- Views appear in both `tables` and `views`, so excluding them from tables - EXCEPT - - select * from views - ) - select views.*, 'view' AS table_type FROM views - UNION ALL - select tables.*, 'table' AS table_type FROM tables - {% endcall %} - {% do return(load_result('list_relations_without_caching').table) %} -{% endmacro %} - -{% macro athena__get_columns_in_relation(relation) -%} - {% call statement('get_columns_in_relation', fetch_result=True) %} - - select - column_name, - data_type, - null as character_maximum_length, - null as numeric_precision, - null as numeric_scale - - from {{ relation.information_schema('columns') }} - where LOWER(table_name) = LOWER('{{ relation.identifier }}') - {% if relation.schema %} - and LOWER(table_schema) = LOWER('{{ relation.schema }}') - {% endif %} - order by ordinal_position - - {% endcall %} - - {% set table = load_result('get_columns_in_relation').table %} - {% do return(sql_convert_columns_in_relation(table)) %} -{% endmacro %} - -{% macro athena__drop_relation(relation) -%} - {% if config.get('incremental_strategy') == 'insert_overwrite' %} - {%- do adapter.clean_up_table(relation.schema, relation.table) -%} - {% endif %} - {% call statement('drop_relation', auto_begin=False) -%} - drop {{ relation.type }} if exists {{ relation }} - {%- endcall %} -{% endmacro %} - -{% macro athena__current_timestamp() -%} - -- pyathena converts time zoned timestamps to strings so lets avoid them - -- now() - cast(now() as timestamp) -{%- endmacro %} diff --git a/dbt/include/athena/macros/adapters/columns.sql b/dbt/include/athena/macros/adapters/columns.sql new file mode 100644 index 00000000..2c62f4d7 --- /dev/null +++ b/dbt/include/athena/macros/adapters/columns.sql @@ -0,0 +1,22 @@ +{% macro athena__get_columns_in_relation(relation) -%} + {% call statement('get_columns_in_relation', fetch_result=True) %} + + select + column_name, + data_type, + null as character_maximum_length, + null as numeric_precision, + null as numeric_scale + + from {{ relation.information_schema('columns') }} + where LOWER(table_name) = LOWER('{{ relation.identifier }}') + {% if relation.schema %} + and LOWER(table_schema) = LOWER('{{ relation.schema }}') + {% endif %} + order by ordinal_position + + {% endcall %} + + {% set table = load_result('get_columns_in_relation').table %} + {% do return(sql_convert_columns_in_relation(table)) %} +{% endmacro %} diff --git a/dbt/include/athena/macros/adapters/freshness.sql b/dbt/include/athena/macros/adapters/freshness.sql new file mode 100644 index 00000000..b980d220 --- /dev/null +++ b/dbt/include/athena/macros/adapters/freshness.sql @@ -0,0 +1,5 @@ +{% macro athena__current_timestamp() -%} + -- pyathena converts time zoned timestamps to strings so lets avoid them + -- now() + cast(now() as timestamp) +{%- endmacro %} diff --git a/dbt/include/athena/macros/catalog.sql b/dbt/include/athena/macros/adapters/metadata.sql similarity index 68% rename from dbt/include/athena/macros/catalog.sql rename to dbt/include/athena/macros/adapters/metadata.sql index 73b96d8b..a6e9f1c9 100644 --- a/dbt/include/athena/macros/catalog.sql +++ b/dbt/include/athena/macros/adapters/metadata.sql @@ -1,4 +1,3 @@ - {% macro athena__get_catalog(information_schema, schemas) -%} {%- set query -%} select * from ( @@ -77,3 +76,45 @@ {{ return(run_query(query)) }} {%- endmacro %} + + +{% macro athena__list_schemas(database) -%} + {% call statement('list_schemas', fetch_result=True) %} + select + distinct schema_name + + from {{ information_schema_name(database) }}.schemata + {% endcall %} + {{ return(load_result('list_schemas').table) }} +{% endmacro %} + + +{% macro athena__list_relations_without_caching(schema_relation) %} + {% call statement('list_relations_without_caching', fetch_result=True) -%} + WITH views AS ( + select + table_catalog as database, + table_name as name, + table_schema as schema + from {{ schema_relation.information_schema() }}.views + where LOWER(table_schema) = LOWER('{{ schema_relation.schema }}') + ), tables AS ( + select + table_catalog as database, + table_name as name, + table_schema as schema + + from {{ schema_relation.information_schema() }}.tables + where LOWER(table_schema) = LOWER('{{ schema_relation.schema }}') + + -- Views appear in both `tables` and `views`, so excluding them from tables + EXCEPT + + select * from views + ) + select views.*, 'view' AS table_type FROM views + UNION ALL + select tables.*, 'table' AS table_type FROM tables + {% endcall %} + {% do return(load_result('list_relations_without_caching').table) %} +{% endmacro %} diff --git a/dbt/include/athena/macros/adapters/relation.sql b/dbt/include/athena/macros/adapters/relation.sql new file mode 100644 index 00000000..7e73aca7 --- /dev/null +++ b/dbt/include/athena/macros/adapters/relation.sql @@ -0,0 +1,8 @@ +{% macro athena__drop_relation(relation) -%} + {% if config.get('incremental_strategy') == 'insert_overwrite' %} + {%- do adapter.clean_up_table(relation.schema, relation.table) -%} + {% endif %} + {% call statement('drop_relation', auto_begin=False) -%} + drop {{ relation.type }} if exists {{ relation }} + {%- endcall %} +{% endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/helpers.sql b/dbt/include/athena/macros/materializations/models/helpers.sql new file mode 100644 index 00000000..74148a9d --- /dev/null +++ b/dbt/include/athena/macros/materializations/models/helpers.sql @@ -0,0 +1,7 @@ +{% macro set_table_classification(relation, default_value) -%} + {%- set format = config.get('format', default=default_value) -%} + + {% call statement('set_table_classification', auto_begin=False) -%} + alter table {{ relation }} set tblproperties ('classification' = '{{ format }}') + {%- endcall %} +{%- endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/incremental/helpers.sql b/dbt/include/athena/macros/materializations/models/incremental/helpers.sql new file mode 100644 index 00000000..3f6b1f59 --- /dev/null +++ b/dbt/include/athena/macros/materializations/models/incremental/helpers.sql @@ -0,0 +1,54 @@ +{% macro validate_get_incremental_strategy(raw_strategy) %} + {% set invalid_strategy_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + Expected one of: 'append', 'insert_overwrite' + {%- endset %} + + {% if raw_strategy not in ['append', 'insert_overwrite'] %} + {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} + {% endif %} + + {% do return(raw_strategy) %} +{% endmacro %} + +{% macro incremental_insert(tmp_relation, target_relation, statement_name="main") %} + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + + insert into {{ target_relation }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ tmp_relation }} + ); +{%- endmacro %} + +{% macro delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %} + {%- set partitioned_keys = partitioned_by | tojson | replace('\"', '') | replace('[', '') | replace(']', '') -%} + {% call statement('get_partitions', fetch_result=True) %} + select distinct {{partitioned_keys}} from {{ tmp_relation }}; + {% endcall %} + {%- set table = load_result('get_partitions').table -%} + {%- set rows = table.rows -%} + {%- set partitions = [] -%} + {%- for row in rows -%} + {%- set single_partition = [] -%} + {%- for col in row -%} + {%- set column_type = adapter.convert_type(table, loop.index0) -%} + {%- if column_type == 'integer' -%} + {%- set value = col|string -%} + {%- elif column_type == 'string' -%} + {%- set value = "'" + col + "'" -%} + {%- elif column_type == 'date' -%} + {%- set value = "'" + col|string + "'" -%} + {%- else -%} + {%- do exceptions.raise_compiler_error('Need to add support for column type ' + column_type) -%} + {%- endif -%} + {%- do single_partition.append(partitioned_by[loop.index0] + '=' + value) -%} + {%- endfor -%} + {%- set single_partition_expression = single_partition | join(' and ') -%} + {%- do partitions.append('(' + single_partition_expression + ')') -%} + {%- endfor -%} + {%- for i in range(partitions | length) %} + {%- do adapter.clean_up_partitions(target_relation.schema, target_relation.table, partitions[i]) -%} + {%- endfor -%} +{%- endmacro %} diff --git a/dbt/include/athena/macros/materializations/incremental.sql b/dbt/include/athena/macros/materializations/models/incremental/incremental.sql similarity index 53% rename from dbt/include/athena/macros/materializations/incremental.sql rename to dbt/include/athena/macros/materializations/models/incremental/incremental.sql index 8c82d1e9..f20fee5e 100644 --- a/dbt/include/athena/macros/materializations/incremental.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/incremental.sql @@ -1,58 +1,3 @@ -{% macro validate_get_incremental_strategy(raw_strategy) %} - {% set invalid_strategy_msg -%} - Invalid incremental strategy provided: {{ raw_strategy }} - Expected one of: 'append', 'insert_overwrite' - {%- endset %} - - {% if raw_strategy not in ['append', 'insert_overwrite'] %} - {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} - {% endif %} - - {% do return(raw_strategy) %} -{% endmacro %} - -{% macro incremental_insert(tmp_relation, target_relation, statement_name="main") %} - {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} - {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - - insert into {{ target_relation }} ({{ dest_cols_csv }}) - ( - select {{ dest_cols_csv }} - from {{ tmp_relation }} - ); -{%- endmacro %} - -{% macro delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %} - {%- set partitioned_keys = partitioned_by | tojson | replace('\"', '') | replace('[', '') | replace(']', '') -%} - {% call statement('get_partitions', fetch_result=True) %} - select distinct {{partitioned_keys}} from {{ tmp_relation }}; - {% endcall %} - {%- set table = load_result('get_partitions').table -%} - {%- set rows = table.rows -%} - {%- set partitions = [] -%} - {%- for row in rows -%} - {%- set single_partition = [] -%} - {%- for col in row -%} - {%- set column_type = adapter.convert_type(table, loop.index0) -%} - {%- if column_type == 'integer' -%} - {%- set value = col|string -%} - {%- elif column_type == 'string' -%} - {%- set value = "'" + col + "'" -%} - {%- elif column_type == 'date' -%} - {%- set value = "'" + col|string + "'" -%} - {%- else -%} - {%- do exceptions.raise_compiler_error('Need to add support for column type ' + column_type) -%} - {%- endif -%} - {%- do single_partition.append(partitioned_by[loop.index0] + '=' + value) -%} - {%- endfor -%} - {%- set single_partition_expression = single_partition | join(' and ') -%} - {%- do partitions.append('(' + single_partition_expression + ')') -%} - {%- endfor -%} - {%- for i in range(partitions | length) %} - {%- do adapter.clean_up_partitions(target_relation.schema, target_relation.table, partitions[i]) -%} - {%- endfor -%} -{%- endmacro %} - {% materialization incremental, adapter='athena' -%} {% set unique_key = config.get('unique_key') %} diff --git a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql new file mode 100644 index 00000000..504ba148 --- /dev/null +++ b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql @@ -0,0 +1,32 @@ +{% macro athena__create_table_as(temporary, relation, sql) -%} + {%- set external_location = config.get('external_location', default=none) -%} + {%- set partitioned_by = config.get('partitioned_by', default=none) -%} + {%- set bucketed_by = config.get('bucketed_by', default=none) -%} + {%- set bucket_count = config.get('bucket_count', default=none) -%} + {%- set field_delimiter = config.get('field_delimiter', default=none) -%} + {%- set format = config.get('format', default='parquet') -%} + + create table + {{ relation }} + + with ( + {%- if external_location is not none and not temporary %} + external_location='{{ external_location }}', + {%- endif %} + {%- if partitioned_by is not none %} + partitioned_by=ARRAY{{ partitioned_by | tojson | replace('\"', '\'') }}, + {%- endif %} + {%- if bucketed_by is not none %} + bucketed_by=ARRAY{{ bucketed_by | tojson | replace('\"', '\'') }}, + {%- endif %} + {%- if bucket_count is not none %} + bucket_count={{ bucket_count }}, + {%- endif %} + {%- if field_delimiter is not none %} + field_delimiter='{{ field_delimiter }}', + {%- endif %} + format='{{ format }}' + ) + as + {{ sql }} +{% endmacro %} diff --git a/dbt/include/athena/macros/materializations/table.sql b/dbt/include/athena/macros/materializations/models/table/table.sql similarity index 99% rename from dbt/include/athena/macros/materializations/table.sql rename to dbt/include/athena/macros/materializations/models/table/table.sql index b0e1e2c3..ad425f00 100644 --- a/dbt/include/athena/macros/materializations/table.sql +++ b/dbt/include/athena/macros/materializations/models/table/table.sql @@ -1,4 +1,3 @@ - {% materialization table, adapter='athena' -%} {%- set identifier = model['alias'] -%} diff --git a/dbt/include/athena/macros/materializations/view.sql b/dbt/include/athena/macros/materializations/models/view/create_or_replace_view.sql similarity index 83% rename from dbt/include/athena/macros/materializations/view.sql rename to dbt/include/athena/macros/materializations/models/view/create_or_replace_view.sql index bcc0d6d0..bf7c0699 100644 --- a/dbt/include/athena/macros/materializations/view.sql +++ b/dbt/include/athena/macros/materializations/models/view/create_or_replace_view.sql @@ -38,13 +38,3 @@ {{ return({'relations': [target_relation]}) }} {% endmacro %} - - -{% materialization view, adapter='athena' -%} - {% set to_return = create_or_replace_view(run_outside_transaction_hooks=False) %} - - {% set target_relation = this.incorporate(type='view') %} - {% do persist_docs(target_relation, model) %} - - {% do return(to_return) %} -{%- endmaterialization %} diff --git a/dbt/include/athena/macros/materializations/models/view/create_view_as.sql b/dbt/include/athena/macros/materializations/models/view/create_view_as.sql new file mode 100644 index 00000000..5871a739 --- /dev/null +++ b/dbt/include/athena/macros/materializations/models/view/create_view_as.sql @@ -0,0 +1,6 @@ +{% macro athena__create_view_as(relation, sql) -%} + create or replace view + {{ relation }} + as + {{ sql }} +{% endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/view/view.sql b/dbt/include/athena/macros/materializations/models/view/view.sql new file mode 100644 index 00000000..3b1a4a89 --- /dev/null +++ b/dbt/include/athena/macros/materializations/models/view/view.sql @@ -0,0 +1,8 @@ +{% materialization view, adapter='athena' -%} + {% set to_return = create_or_replace_view(run_outside_transaction_hooks=False) %} + + {% set target_relation = this.incorporate(type='view') %} + {% do persist_docs(target_relation, model) %} + + {% do return(to_return) %} +{%- endmaterialization %} diff --git a/dbt/include/athena/macros/materializations/seed.sql b/dbt/include/athena/macros/materializations/seeds/helpers.sql similarity index 99% rename from dbt/include/athena/macros/materializations/seed.sql rename to dbt/include/athena/macros/materializations/seeds/helpers.sql index c1cb4afe..bbc0e0e0 100644 --- a/dbt/include/athena/macros/materializations/seed.sql +++ b/dbt/include/athena/macros/materializations/seeds/helpers.sql @@ -1,4 +1,3 @@ - {% macro default__reset_csv_table(model, full_refresh, old_relation, agate_table) %} {% set sql = "" %} -- No truncate in Athena so always drop CSV table and recreate diff --git a/dbt/include/athena/macros/materializations/snapshot.sql b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql similarity index 99% rename from dbt/include/athena/macros/materializations/snapshot.sql rename to dbt/include/athena/macros/materializations/snapshots/snapshot.sql index f1464d12..bd384ab4 100644 --- a/dbt/include/athena/macros/materializations/snapshot.sql +++ b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql @@ -1,4 +1,3 @@ - {% materialization snapshot, adapter='athena' -%} {{ exceptions.raise_not_implemented( 'snapshot materialization not implemented for '+adapter.type()) diff --git a/dbt/include/athena/profile_template.yml b/dbt/include/athena/profile_template.yml new file mode 100644 index 00000000..d7600565 --- /dev/null +++ b/dbt/include/athena/profile_template.yml @@ -0,0 +1,14 @@ +fixed: + type: athena +prompts: + s3_staging_dir: + hint: S3 location to store Athena query results and metadata + + region_name: + hint: AWS region of your Athena instance + + schema: + hint: Specify the schema (Athena database) to build models into (lowercase only) + + database: + hint: Specify the database (Data catalog) to build models into (lowercase only) diff --git a/requirements.txt b/requirements.txt index 580ac925..055417ad 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -dbt-core==0.21.0 +dbt-core==1.0.* pyathena==2.2.0 boto3==1.18.12 -tenacity==6.3.1 \ No newline at end of file +tenacity==6.3.1 diff --git a/setup.py b/setup.py index 78adbdf7..81292f8b 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ def _dbt_athena_version() -> str: description = """The athena adapter plugin for dbt (data build tool)""" -dbt_version = "0.21.0" +dbt_version = "1.0" if not package_version.startswith(dbt_version): raise ValueError( @@ -50,16 +50,9 @@ def _dbt_athena_version() -> str: author_email="tomelvey@googlemail.com", url="https://github.com/Tomme/dbt-athena", packages=find_namespace_packages(include=["dbt", "dbt.*"]), - package_data={ - "dbt": [ - "include/athena/dbt_project.yml", - "include/athena/sample_profiles.yml", - "include/athena/macros/*.sql", - "include/athena/macros/*/*.sql", - ] - }, + include_package_data=True, install_requires=[ - "dbt-core==0.21.0", + "dbt-core~=1.0.0", "pyathena==2.2.0", "boto3==1.18.12", "tenacity==6.3.1",