Skip to content

Commit

Permalink
relax uniqueness and drop simple dup rows
Browse files Browse the repository at this point in the history
  • Loading branch information
Laurie Merrell committed Oct 2, 2023
1 parent 82fb1c0 commit 5b2ac0a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 11 deletions.
6 changes: 3 additions & 3 deletions warehouse/macros/littlepay_staging_transforms.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ CASE
END
{% endmacro %}

{% macro qualify_dedupe_lp_files(file_dt_col = 'littlepay_export_date', file_ts_col = 'littlepay_export_ts', ts_col = 'ts') %}
{% macro qualify_dedupe_lp_files(instance_col = 'instance', file_dt_col = 'littlepay_export_date', file_ts_col = 'littlepay_export_ts', ts_col = 'ts') %}

-- remove duplicate instances of the same file (file defined as date-level update from LP)
-- partition by file date, order by LP-defined timestamp (most recent first), and then order by our extract timestamp (most recent first)
-- use dense rank instead of row number because we need to allow all rows from a given file to be included (allow ties)
QUALIFY DENSE_RANK()
OVER (PARTITION BY {{ file_dt_col }} ORDER BY {{ file_ts_col }} DESC, {{ ts_col }} DESC) = 1
OVER (PARTITION BY {{ instance_col }}, {{ file_dt_col }} ORDER BY {{ file_ts_col }} DESC, {{ ts_col }} DESC) = 1

{% endmacro %}

{% macro qualify_dedupe_lp_rows(content_hash_col = 'content_hash', file_ts_col = 'littlepay_export_ts', line_number_col = '_line_number') %}
{% macro qualify_dedupe_full_duplicate_lp_rows(content_hash_col = 'content_hash', file_ts_col = 'littlepay_export_ts', line_number_col = '_line_number') %}

-- remove full duplicate rows where *all* content is the same
-- get most recent instance across files and then highest-line-number instance within most recent file
Expand Down
6 changes: 3 additions & 3 deletions warehouse/models/staging/payments/littlepay/_littlepay.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ models:
tests:
- not_null
- unique
- &payments_natural_key
name: _payments_key
- name: _payments_key
description: |
Synthentic key composed of the elements that define a natural key within the source data (primary key according to Littlepay schema.)
tests:
- not_null
- unique
- unique_proportion:
at_least: 0.999

- name: stg_littlepay__customer_funding_source
tests:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,35 @@ clean_columns_and_dedupe_files AS (
'retrieval_reference_number', 'littlepay_reference_number', 'external_reference_number',
'response_code', 'status', 'authorisation_date_time_utc']) }} AS content_hash,
FROM source
-- drop extra header rows
WHERE aggregation_id != "aggregation_id"
{{ qualify_dedupe_lp_files() }}
),

add_keys_drop_full_dupes AS (
SELECT
*,
-- generate keys now that input columns have been trimmed & cast and files deduped
{{ dbt_utils.generate_surrogate_key(['littlepay_export_date', '_line_number', 'instance']) }} AS _key,
{{ dbt_utils.generate_surrogate_key(['aggregation_id', 'authorisation_date_time_utc']) }} AS _payments_key,
FROM clean_columns_and_dedupe_files
{{ qualify_dedupe_full_duplicate_lp_rows() }}
),

-- we have some authorisations where the same aggregation has multiple rows with the same timestamp
-- these seem like clear duplicates, and some of them one of the two copies is missing status and RRN; these can be dropped
-- the rest need to be handled downstream by checking against settlements data
same_timestamp_simple_dupes AS (
SELECT
_payments_key,
TRUE AS to_drop,
COUNT(DISTINCT retrieval_reference_number) AS ct_rrn,
COUNT(*) AS ct
FROM add_keys_drop_full_dupes
GROUP BY 1
HAVING ct > 1 AND ct_rrn = 1
),

stg_littlepay__authorisations AS (
SELECT
participant_id,
Expand All @@ -62,11 +88,12 @@ stg_littlepay__authorisations AS (
littlepay_export_ts,
littlepay_export_date,
ts,
{{ dbt_utils.generate_surrogate_key(['littlepay_export_date', '_line_number', 'instance']) }} AS _key,
{{ dbt_utils.generate_surrogate_key(['aggregation_id', 'authorisation_date_time_utc']) }} AS _payments_key,
FROM clean_columns_and_dedupe_files
{{ qualify_dedupe_lp_rows() }}

_key,
_payments_key,
FROM add_keys_drop_full_dupes
LEFT JOIN same_timestamp_simple_dupes
USING(_payments_key)
WHERE NOT COALESCE(to_drop, FALSE)
)

SELECT * FROM stg_littlepay__authorisations

0 comments on commit 5b2ac0a

Please sign in to comment.