Skip to content

Commit

Permalink
Merge pull request #17 from pageuppeople-opensource/0.15-support
Browse files Browse the repository at this point in the history
0.15 support
  • Loading branch information
elexisvenator authored Nov 27, 2019
2 parents c7ae89e + eedd1a8 commit b13b6d2
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 80 deletions.
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name: 'pageup_dbt_utils'
version: '3.0.0'

require-dbt-version: ">=0.14.3"
require-dbt-version: ">=0.15.0"

profile: 'default'

Expand Down
131 changes: 52 additions & 79 deletions macros/materializations/timestamp_incremental.sql
Original file line number Diff line number Diff line change
@@ -1,99 +1,72 @@
{#/* Based on the incremental materialization from dbt. */#}
{% macro pageup__timestamp_incremental_delete(target_relation, tmp_relation) -%}

{%- set unique_key = config.require('unique_key') -%}

delete
from {{ target_relation }}
where ({{ unique_key }}) in (
select ({{ unique_key }})
from {{ tmp_relation.include(schema=False, database=False) }}
);

{% macro build_timestamp_incremental_sql(target_relation, sql) -%}
{%- set timestamp_suffix = var('TIMESTAMP_SUFFIX', 'model_timestamp') -%}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{#- We are using a subselect instead of a CTE here to allow PostgreSQL to use indexes. #}
select *
from (
{{ sql }}
) as dbt_incr_sbq

{#- Generate a check for each timestamp column to ensure we only update changed rows -#}
{#- Note: it doesnt have to be a timestamp, any comparable type will work too, as long as newer rows have a bigger value -#}
{%- for col in dest_columns if col.name.endswith(timestamp_suffix) %}
{% if loop.first %}where {% else %} or {% endif -%}
{{ col.quoted }} > (select max({{ col.quoted }}) from {{ target_relation }})
{%- endfor %}
{%- endmacro %}

{% materialization timestamp_incremental, default -%}
{%- set unique_key = config.require('unique_key') -%}
{%- set timestamp_suffix = var('TIMESTAMP_SUFFIX', 'model_timestamp') -%}

{%- set identifier = model['alias'] -%}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, type='table') -%}
{%- set tmp_relation = make_temp_relation(target_relation) %}

{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}

{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%}
{% set unique_key = config.get('unique_key') %}
{% set full_refresh_mode = flags.FULL_REFRESH %}

{%- set should_drop = (full_refresh_mode or exists_not_as_table) -%}
{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

-- setup
{% if old_relation is none -%}
-- noop
{%- elif should_drop -%}
{{ adapter.drop_relation(old_relation) }}
{%- set old_relation = none -%}
{%- endif %}
{# -- set the type so our rename / drop uses the correct syntax #}
{% set backup_type = existing_relation.type | default("table") %}
{% set backup_relation = make_temp_relation(this, "__dbt_backup").incorporate(type=backup_type) %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- build model
{% if full_refresh_mode or old_relation is none -%}
{%- call statement('main') -%}
{{ create_table_as(False, target_relation, sql) }}
{%- endcall -%}
{%- else -%}
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
{%- call statement() -%}
{% set tmp_table_sql -%}
{#/* We are using a subselect instead of a CTE here to allow PostgreSQL to use indexes. */-#}
select *
from (
{{ sql }}
) as dbt_incr_sbq

{#/* Generate a check for each timestamp column to ensure we only update changed rows */-#}
{#/* Note: it doesnt have to be a timestamp, any comparable type will work too, as long as newer rows have a bigger value */-#}
{%- for col in dest_columns if col.name.endswith(timestamp_suffix) %}
{% if loop.first %}where {% else %} or {% endif -%}
{{ col.quoted }} > (select max({{ col.quoted }}) from {{ target_relation }})
{%- endfor %}

{%- endset %}

{{ dbt.create_table_as(True, tmp_relation, tmp_table_sql) }}

{%- endcall -%}

{{ adapter.expand_target_column_types(from_relation=tmp_relation,
to_relation=target_relation) }}

{%- call statement('main') -%}
{% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %}

{% if unique_key is not none -%}

{{ dbt__incremental_delete(target_relation, tmp_relation) }}

{%- endif %}

insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
);
{% endcall %}
{%- endif %}
{% set to_drop = [] %}
{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or full_refresh_mode %}
{% do adapter.rename_relation(target_relation, backup_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% do to_drop.append(backup_relation) %}
{% else %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{# BEGIN MODIFIED CODE #}
{% set incremental_sql = pageup_dbt_utils.build_timestamp_incremental_sql(target_relation, sql) %}
{% do run_query(create_table_as(True, tmp_relation, incremental_sql)) %}
{# END MODIFIED CODE #}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
{% endif %}

{% call statement("main") %}
{{ build_sql }}
{% endcall %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}
{% do adapter.commit() %}

{% for rel in to_drop %}
{% do drop_relation(rel) %}
{% endfor %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}

0 comments on commit b13b6d2

Please sign in to comment.