diff --git a/warehouse/models/staging/payments/littlepay/_littlepay.yml b/warehouse/models/staging/payments/littlepay/_littlepay.yml index 59c9d4bb38..c1dbe125ce 100644 --- a/warehouse/models/staging/payments/littlepay/_littlepay.yml +++ b/warehouse/models/staging/payments/littlepay/_littlepay.yml @@ -196,14 +196,6 @@ models: Should ideally be handled by uniqueness of _payments_key but surfaced for troubleshooting. - name: stg_littlepay__customer_funding_source - tests: - - &littlepay_uniqueness - dbt_utils.unique_combination_of_columns: - combination_of_columns: - - instance - - extract_filename - - ts - - _line_number columns: - name: littlepay_export_ts description: Export timestamp parsed from filename. @@ -243,10 +235,20 @@ models: description: The number of records on which the `customer_id` shows up in the table - name: calitp_customer_id_rank description: A ranking of the records using the `customer_id`, ordered newest to oldest + - *lp_export_date + - *lp_export_ts + - *lp_line_number + - *payments_input_row_key + - &payments_key_full_uniqueness + name: _payments_key + description: | + Synthentic key composed of the elements that define a natural key within the source data (primary key according to Littlepay schema.) + tests: + - not_null + - unique + - *_content_hash - name: stg_littlepay__device_transaction_purchases - tests: - - *littlepay_uniqueness columns: - name: littlepay_transaction_id description: Unique tap ID generated by Littlepay upon receipt of a tap to the Device API. @@ -269,10 +271,14 @@ models: description: The value of the purchase. The value may be adjusted by capping rules or tap-off. - name: transaction_time description: The time transaction was created. + - *lp_export_date + - *lp_export_ts + - *lp_line_number + - *payments_input_row_key + - *payments_key_full_uniqueness + - *_content_hash - name: stg_littlepay__device_transactions - tests: - - *littlepay_uniqueness description: In the case of duplicated littlepay_transaction_id values, the one with the latest transaction_date_time_utc in the latest export file takes precedence. columns: - name: customer_id @@ -319,7 +325,7 @@ models: description: The route identifier provided by the device. - name: transaction_date_time_utc description: The date and time that the customer tapped on the device. - - name: transaction_deny_reason + - name: transction_deny_reason description: | The reason a transaction was denied. @@ -349,10 +355,15 @@ models: description: The zone identifier provided by the device. - name: transaction_date_time_pacific description: The date and time that the customer tapped on the device (in Pacific time instead of UTC). + - *lp_export_date + - *lp_export_ts + - *lp_line_number + - *payments_input_row_key + - *payments_key_full_uniqueness + - *_content_hash - name: stg_littlepay__micropayment_adjustments tests: - - *littlepay_uniqueness - dbt_utils.unique_combination_of_columns: combination_of_columns: - micropayment_id @@ -404,10 +415,14 @@ models: The ID of the product with the incentive that was applied to micropayments. Same ID as `product_id`. + - *lp_export_date + - *lp_export_ts + - *lp_line_number + - *payments_input_row_key + - *payments_key_full_uniqueness + - *_content_hash - name: stg_littlepay__micropayment_device_transactions - tests: - - *littlepay_uniqueness columns: - name: littlepay_transaction_id description: | @@ -416,10 +431,22 @@ models: For 'tap on, tap off' integrations, merchants will see two transactions for each micropayment. - name: micropayment_id description: The micropayment that the transaction is associated with. + - *lp_export_date + - *lp_export_ts + - *lp_line_number + - *payments_input_row_key + - *payments_key_full_uniqueness + - *_content_hash - name: stg_littlepay__micropayments tests: - - *littlepay_uniqueness + - &littlepay_uniqueness + dbt_utils.unique_combination_of_columns: + combination_of_columns: + - instance + - extract_filename + - ts + - _line_number description: This model makes the assumption that, in the case of duplicated micropayment_id values, the one with the latest transaction_time in the latest export file takes precedence. columns: - name: micropayment_id @@ -472,17 +499,9 @@ models: - *lp_line_number - *payments_input_row_key - *_content_hash - - &payments_key_full_uniqueness - name: _payments_key - description: | - Synthentic key composed of the elements that define a natural key within the source data (primary key according to Littlepay schema.) - tests: - - not_null - - unique + - *payments_key_full_uniqueness - name: stg_littlepay__product_data - tests: - - *littlepay_uniqueness columns: - name: participant_id description: Identifies the participant that the micropayment belongs to. @@ -617,6 +636,12 @@ models: description: | Configuration type for the incentive. Possible values are `DAILY_TABLE` or `ZONE_TRIANGLE`. + - *lp_export_date + - *lp_export_ts + - *lp_line_number + - *payments_input_row_key + - *payments_key_full_uniqueness + - *_content_hash - name: stg_littlepay__refunds tests: diff --git a/warehouse/models/staging/payments/littlepay/stg_littlepay__customer_funding_source.sql b/warehouse/models/staging/payments/littlepay/stg_littlepay__customer_funding_source.sql index 4b6bb76836..ee7342ccce 100644 --- a/warehouse/models/staging/payments/littlepay/stg_littlepay__customer_funding_source.sql +++ b/warehouse/models/staging/payments/littlepay/stg_littlepay__customer_funding_source.sql @@ -1,8 +1,8 @@ WITH source AS ( - SELECT * FROM {{ littlepay_source('external_littlepay', 'customer_funding_source') }} + SELECT * FROM {{ source('external_littlepay', 'customer_funding_source') }} ), -stg_littlepay__customer_funding_source AS ( +clean_columns AS ( SELECT {{ trim_make_empty_string_null('funding_source_id') }} AS funding_source_id, {{ trim_make_empty_string_null('funding_source_vault_id') }} AS funding_source_vault_id, @@ -14,11 +14,23 @@ stg_littlepay__customer_funding_source AS ( {{ trim_make_empty_string_null('issuer_country') }} AS issuer_country, {{ trim_make_empty_string_null('form_factor') }} AS form_factor, {{ trim_make_empty_string_null('principal_customer_id') }} AS principal_customer_id, - _line_number, + CAST(_line_number AS INTEGER) AS _line_number, `instance`, extract_filename, ts, - littlepay_export_ts, + {{ extract_littlepay_filename_ts() }} AS littlepay_export_ts, + {{ extract_littlepay_filename_date() }} AS littlepay_export_date, + -- hash all content not generated by us to enable deduping full dup rows + -- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream + {{ dbt_utils.generate_surrogate_key(['funding_source_id', 'funding_source_vault_id', + 'customer_id', 'bin', 'masked_pan', 'card_scheme', 'issuer', 'issuer_country', + 'form_factor']) }} AS _content_hash, + FROM source +), + +add_keys_drop_full_dupes AS ( + SELECT + *, -- flag in reverse order, since we usually want the latest DENSE_RANK() OVER ( PARTITION BY funding_source_id @@ -29,7 +41,47 @@ stg_littlepay__customer_funding_source AS ( DENSE_RANK() OVER ( PARTITION BY customer_id ORDER BY littlepay_export_ts DESC) AS calitp_customer_id_rank, - FROM source + -- generate keys now that input columns have been trimmed & cast and files deduped + {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, + {{ dbt_utils.generate_surrogate_key(['funding_source_id', 'customer_id']) }} AS _payments_key, + FROM clean_columns + {{ qualify_dedupe_full_duplicate_lp_rows() }} +), + +stg_littlepay__customer_funding_source AS ( + SELECT + funding_source_id, + funding_source_vault_id, + customer_id, + bin, + masked_pan, + card_scheme, + issuer, + issuer_country, + form_factor, + principal_customer_id, + _line_number, + `instance`, + extract_filename, + ts, + littlepay_export_ts, + littlepay_export_date, + calitp_funding_source_id_rank, + calitp_funding_source_vault_id_rank, + calitp_customer_id_rank, + _key, + _payments_key, + _content_hash, + FROM add_keys_drop_full_dupes + -- Some funding sources have incomplete information when first present in data, like missing + -- values for form_factor or issuer_country that are filled in during later exports. + -- Additionally, sometimes a filled column value is updated in newer exports for a given entry. + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY + funding_source_id, + customer_id + ORDER BY littlepay_export_ts DESC + ) = 1 ) SELECT * FROM stg_littlepay__customer_funding_source diff --git a/warehouse/models/staging/payments/littlepay/stg_littlepay__device_transaction_purchases.sql b/warehouse/models/staging/payments/littlepay/stg_littlepay__device_transaction_purchases.sql index d61176da04..3f0a65d515 100644 --- a/warehouse/models/staging/payments/littlepay/stg_littlepay__device_transaction_purchases.sql +++ b/warehouse/models/staging/payments/littlepay/stg_littlepay__device_transaction_purchases.sql @@ -2,7 +2,7 @@ WITH source AS ( SELECT * FROM {{ source('external_littlepay', 'device_transaction_purchases') }} ), -stg_littlepay__device_transaction_purchases AS ( +clean_columns AS ( SELECT {{ trim_make_empty_string_null('littlepay_transaction_id') }} AS littlepay_transaction_id, {{ trim_make_empty_string_null('purchase_id') }} AS purchase_id, @@ -11,11 +11,57 @@ stg_littlepay__device_transaction_purchases AS ( {{ trim_make_empty_string_null('description') }} AS description, {{ safe_cast('indicative_amount', type_numeric()) }} AS indicative_amount, {{ safe_cast('transaction_time', type_timestamp()) }} AS transaction_time, - _line_number, + CAST(_line_number AS INTEGER) AS _line_number, `instance`, extract_filename, + {{ extract_littlepay_filename_ts() }} AS littlepay_export_ts, + {{ extract_littlepay_filename_date() }} AS littlepay_export_date, ts, + -- hash all content not generated by us to enable deduping full dup rows + -- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream + {{ dbt_utils.generate_surrogate_key(['littlepay_transaction_id', 'purchase_id', + 'correlated_purchase_id', 'product_id', 'description', 'indicative_amount', + 'transaction_time']) }} AS _content_hash, FROM source +), + +add_keys_drop_full_dupes AS ( + SELECT + *, + -- generate keys now that input columns have been trimmed & cast and files deduped + {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, + {{ dbt_utils.generate_surrogate_key(['littlepay_transaction_id', 'purchase_id']) }} AS _payments_key, + FROM clean_columns + {{ qualify_dedupe_full_duplicate_lp_rows() }} +), + +stg_littlepay__device_transaction_purchases AS ( + SELECT + littlepay_transaction_id, + purchase_id, + correlated_purchase_id, + product_id, + description, + indicative_amount, + transaction_time, + _line_number, + `instance`, + extract_filename, + littlepay_export_ts, + littlepay_export_date, + ts, + _key, + _payments_key, + _content_hash, + FROM add_keys_drop_full_dupes + -- Some purchases initially are given a value of 'autoscan' for product_id, and then that + -- value is later updated. No other partial duplicate conditions exist at implementation time. + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY + littlepay_transaction_id, + purchase_id + ORDER BY littlepay_export_ts DESC + ) = 1 ) SELECT * FROM stg_littlepay__device_transaction_purchases diff --git a/warehouse/models/staging/payments/littlepay/stg_littlepay__device_transactions.sql b/warehouse/models/staging/payments/littlepay/stg_littlepay__device_transactions.sql index a069ea2e1a..f192d627ea 100644 --- a/warehouse/models/staging/payments/littlepay/stg_littlepay__device_transactions.sql +++ b/warehouse/models/staging/payments/littlepay/stg_littlepay__device_transactions.sql @@ -1,8 +1,8 @@ WITH source AS ( - SELECT * FROM {{ littlepay_source('external_littlepay', 'device_transactions') }} + SELECT * FROM {{ source('external_littlepay', 'device_transactions') }} ), -stg_littlepay__device_transactions AS ( +clean_columns AS ( SELECT {{ trim_make_empty_string_null('participant_id') }} AS participant_id, {{ trim_make_empty_string_null('customer_id') }} AS customer_id, @@ -32,13 +32,73 @@ stg_littlepay__device_transactions AS ( {{ trim_make_empty_string_null('vehicle_id') }} AS vehicle_id, {{ trim_make_empty_string_null('granted_zone_ids') }} AS granted_zone_ids, {{ trim_make_empty_string_null('onward_zone_ids') }} AS onward_zone_ids, - _line_number, + CAST(_line_number AS INTEGER) AS _line_number, `instance`, extract_filename, + {{ extract_littlepay_filename_ts() }} AS littlepay_export_ts, + {{ extract_littlepay_filename_date() }} AS littlepay_export_date, ts, - littlepay_export_ts, + -- hash all content not generated by us to enable deduping full dup rows + -- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream + {{ dbt_utils.generate_surrogate_key(['participant_id', 'customer_id', + 'device_transaction_id', 'littlepay_transaction_id', 'device_id', 'device_id_issuer', + 'type', 'transaction_outcome', 'transction_deny_reason', 'transaction_date_time_utc', + 'location_id', 'location_scheme', 'location_name', 'zone_id', 'route_id', 'mode', + 'direction', 'latitude', 'longitude', 'vehicle_id', 'granted_zone_ids', + 'onward_zone_ids']) }} AS _content_hash, FROM source - QUALIFY ROW_NUMBER() OVER (PARTITION BY littlepay_transaction_id ORDER BY littlepay_export_ts DESC, transaction_date_time_utc DESC) = 1 +), + +add_keys_drop_full_dupes AS ( + SELECT + *, + -- generate keys now that input columns have been trimmed & cast and files deduped + {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, + littlepay_transaction_id AS _payments_key, + FROM clean_columns + {{ qualify_dedupe_full_duplicate_lp_rows() }} +), + +stg_littlepay__device_transactions AS ( + SELECT + participant_id, customer_id, + device_transaction_id, + littlepay_transaction_id, + device_id, + device_id_issuer, + type, + transaction_outcome, + transction_deny_reason, + transaction_date_time_utc, + transaction_date_time_pacific, + location_id, + location_scheme, + location_name, + zone_id, + route_id, + mode, + direction, + latitude, + longitude, + vehicle_id, + granted_zone_ids, + onward_zone_ids, + _line_number, + `instance`, + extract_filename, + littlepay_export_ts, + littlepay_export_date, + ts, + _key, + _payments_key, + _content_hash, + FROM add_keys_drop_full_dupes + -- Some transactions have placeholder information for routes, stops, and directions in their first export, + -- then a later export contains the canonical version of the transaction with that information corrected + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY littlepay_transaction_id + ORDER BY littlepay_export_ts DESC + ) = 1 ) SELECT * FROM stg_littlepay__device_transactions diff --git a/warehouse/models/staging/payments/littlepay/stg_littlepay__micropayment_adjustments.sql b/warehouse/models/staging/payments/littlepay/stg_littlepay__micropayment_adjustments.sql index 85a1fa6bed..7b0b959ec1 100644 --- a/warehouse/models/staging/payments/littlepay/stg_littlepay__micropayment_adjustments.sql +++ b/warehouse/models/staging/payments/littlepay/stg_littlepay__micropayment_adjustments.sql @@ -1,8 +1,8 @@ WITH source AS ( - SELECT * FROM {{ littlepay_source('external_littlepay', 'micropayment_adjustments') }} + SELECT * FROM {{ source('external_littlepay', 'micropayment_adjustments') }} ), -stg_littlepay__micropayment_adjustments AS ( +clean_columns AS ( SELECT {{ trim_make_empty_string_null('micropayment_id') }} AS micropayment_id, {{ trim_make_empty_string_null('adjustment_id') }} AS adjustment_id, @@ -15,26 +15,53 @@ stg_littlepay__micropayment_adjustments AS ( {{ trim_make_empty_string_null('time_period_type') }} AS time_period_type, {{ safe_cast('applied', type_boolean()) }} AS applied, {{ trim_make_empty_string_null('zone_ids_us') }} AS zone_ids_us, - _line_number, + CAST(_line_number AS INTEGER) AS _line_number, `instance`, extract_filename, ts, + {{ extract_littlepay_filename_ts() }} AS littlepay_export_ts, + {{ extract_littlepay_filename_date() }} AS littlepay_export_date, + -- hash all content not generated by us to enable deduping full dup rows + -- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream + {{ dbt_utils.generate_surrogate_key(['micropayment_id', 'adjustment_id', 'participant_id', + 'customer_id', 'product_id', 'type', 'description', 'amount', 'time_period_type', + 'applied', 'zone_ids_us']) }} AS _content_hash, FROM source - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY - micropayment_id, - adjustment_id, - participant_id, - customer_id, - product_id, - type, - description, - amount, - time_period_type, - applied, - zone_ids_us - ORDER BY littlepay_export_ts DESC - ) = 1 +), + +add_keys_drop_full_dupes AS ( + SELECT + *, + -- generate keys now that input columns have been trimmed & cast and files deduped + {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, + {{ dbt_utils.generate_surrogate_key(['micropayment_id', 'adjustment_id']) }} AS _payments_key, + FROM clean_columns + {{ qualify_dedupe_full_duplicate_lp_rows() }} +), + +stg_littlepay__micropayment_adjustments AS ( + SELECT + micropayment_id, + adjustment_id, + participant_id, + customer_id, + product_id, + type, + description, + amount, + time_period_type, + applied, + zone_ids_us, + _line_number, + `instance`, + extract_filename, + ts, + littlepay_export_ts, + littlepay_export_date, + _key, + _payments_key, + _content_hash, + FROM add_keys_drop_full_dupes ) SELECT * FROM stg_littlepay__micropayment_adjustments diff --git a/warehouse/models/staging/payments/littlepay/stg_littlepay__micropayment_device_transactions.sql b/warehouse/models/staging/payments/littlepay/stg_littlepay__micropayment_device_transactions.sql index b1647c20f4..387fe94217 100644 --- a/warehouse/models/staging/payments/littlepay/stg_littlepay__micropayment_device_transactions.sql +++ b/warehouse/models/staging/payments/littlepay/stg_littlepay__micropayment_device_transactions.sql @@ -1,21 +1,47 @@ WITH source AS ( - SELECT * FROM {{ littlepay_source('external_littlepay', 'micropayment_device_transactions') }} + SELECT * FROM {{ source('external_littlepay', 'micropayment_device_transactions') }} ), -stg_littlepay__micropayment_device_transactions AS ( +clean_columns AS ( SELECT {{ trim_make_empty_string_null('littlepay_transaction_id') }} AS littlepay_transaction_id, {{ trim_make_empty_string_null('micropayment_id') }} AS micropayment_id, - _line_number, + CAST(_line_number AS INTEGER) AS _line_number, `instance`, extract_filename, ts, + {{ extract_littlepay_filename_ts() }} AS littlepay_export_ts, + {{ extract_littlepay_filename_date() }} AS littlepay_export_date, + -- hash all content not generated by us to enable deduping full dup rows + -- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream + {{ dbt_utils.generate_surrogate_key(['littlepay_transaction_id', 'micropayment_id']) }} AS _content_hash, FROM source - QUALIFY ROW_NUMBER() OVER ( - -- could this be a distinct? - PARTITION BY littlepay_transaction_id, micropayment_id - ORDER BY littlepay_export_ts DESC - ) = 1 +), + +add_keys_drop_full_dupes AS ( + SELECT + *, + -- generate keys now that input columns have been trimmed & cast and files deduped + {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, + {{ dbt_utils.generate_surrogate_key(['littlepay_transaction_id', 'micropayment_id']) }} AS _payments_key, + FROM clean_columns + {{ qualify_dedupe_full_duplicate_lp_rows() }} +), + +stg_littlepay__micropayment_device_transactions AS ( + SELECT + littlepay_transaction_id, + micropayment_id, + _line_number, + `instance`, + extract_filename, + ts, + littlepay_export_ts, + littlepay_export_date, + _key, + _payments_key, + _content_hash, + FROM add_keys_drop_full_dupes ) SELECT * FROM stg_littlepay__micropayment_device_transactions diff --git a/warehouse/models/staging/payments/littlepay/stg_littlepay__product_data.sql b/warehouse/models/staging/payments/littlepay/stg_littlepay__product_data.sql index 51e94abc9c..d97f40d07e 100644 --- a/warehouse/models/staging/payments/littlepay/stg_littlepay__product_data.sql +++ b/warehouse/models/staging/payments/littlepay/stg_littlepay__product_data.sql @@ -1,8 +1,8 @@ WITH source AS ( - SELECT * FROM {{ littlepay_source('external_littlepay', 'product_data') }} + SELECT * FROM {{ source('external_littlepay', 'product_data') }} ), -stg_littlepay__product_data AS ( +clean_columns AS ( SELECT {{ trim_make_empty_string_null('participant_id') }} AS participant_id, {{ trim_make_empty_string_null('product_id') }} AS product_id, @@ -29,12 +29,79 @@ stg_littlepay__product_data AS ( {{ trim_make_empty_string_null('capping_time_zone') }} AS capping_time_zone, {{ safe_cast('capping_overlap', 'TIME') }} AS capping_overlap, {{ trim_make_empty_string_null('capping_application_level') }} AS capping_application_level, - _line_number, + CAST(_line_number AS INTEGER) AS _line_number, `instance`, extract_filename, + {{ extract_littlepay_filename_ts() }} AS littlepay_export_ts, + {{ extract_littlepay_filename_date() }} AS littlepay_export_date, ts, + -- hash all content not generated by us to enable deduping full dup rows + -- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream + {{ dbt_utils.generate_surrogate_key(['participant_id', 'product_id', 'product_code', + 'product_description', 'product_type', 'activation_type', 'product_status', + 'created_date', 'capping_type', 'multi_operator', 'capping_start_time', + 'capping_end_time', 'rules_transaction_types', 'rules_default_limit', + 'rules_max_fare_value', 'scheduled_start_date_time', 'scheduled_end_date_time', + 'all_day', 'weekly_cap_start_day', 'number_of_days_in_cap_window', + 'capping_duration', 'number_of_transfer', 'capping_time_zone', 'capping_overlap', + 'capping_application_level']) }} AS _content_hash, FROM source - QUALIFY ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY littlepay_export_ts DESC) = 1 +), + +add_keys_drop_full_dupes AS ( + SELECT + *, + -- generate keys now that input columns have been trimmed & cast and files deduped + {{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key, + product_id AS _payments_key, + FROM clean_columns + {{ qualify_dedupe_full_duplicate_lp_rows() }} +), + +stg_littlepay__product_data AS ( + SELECT + participant_id, + product_id, + product_code, + product_description, + product_type, + activation_type, + product_status, + created_date, + capping_type, + multi_operator, + capping_start_time, + capping_end_time, + rules_transaction_types, + rules_default_limit, + rules_max_fare_value, + scheduled_start_date_time, + scheduled_end_date_time, + all_day, + weekly_cap_start_day, + number_of_days_in_cap_window, + capping_duration, + number_of_transfer, + capping_time_zone, + capping_overlap, + capping_application_level, + _line_number, + `instance`, + extract_filename, + littlepay_export_ts, + littlepay_export_date, + ts, + _key, + _payments_key, + _content_hash, + FROM add_keys_drop_full_dupes + -- Some products change in form over time, e.g. getting different 'capping_type' values or + -- changing in status, which produces replacement rows in new exports. + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY + product_id + ORDER BY littlepay_export_ts DESC + ) = 1 ) SELECT * FROM stg_littlepay__product_data