Skip to content

Commit

Permalink
AN-5236/new transfers model (#364)
Browse files Browse the repository at this point in the history
* new transfers models - still a few edge cases

* fix 1 withdrawal 2 deposited edge case

* upd filter in legacy model

* core union all and view -> incremental

* rm silver so
  • Loading branch information
forgxyz authored Sep 19, 2024
1 parent 24a7caf commit 96755a0
Show file tree
Hide file tree
Showing 12 changed files with 414 additions and 42 deletions.
5 changes: 5 additions & 0 deletions models/descriptions/amount_adj.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% docs amount_adj %}

An amount with decimal adjustment applied.

{% enddocs %}
5 changes: 5 additions & 0 deletions models/descriptions/deposited_uuid_root.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% docs deposited_uuid_root %}

The UUID of the transfer that was deposited to the recipient. This is the root deposit, as certain transfers that utilize a Vault may have two subsequent Deposited events.

{% enddocs %}
5 changes: 5 additions & 0 deletions models/descriptions/from_address_balance_after.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% docs from_address_balance_after %}

The token balance of the withdrawn token in the sender account after the transfer.

{% enddocs %}
5 changes: 5 additions & 0 deletions models/descriptions/is_fee_transfer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% docs is_fee_transfer %}

A boolean field indicating if the transfer was the transaction fee payment, as assessed by to_uuid = 0

{% enddocs %}
5 changes: 5 additions & 0 deletions models/descriptions/to_address_balance_after.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% docs to_address_balance_after %}

The token balance of the deposited token in the recipient account after the transfer.

{% enddocs %}
5 changes: 5 additions & 0 deletions models/descriptions/withdrawn_uuid_root.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% docs withdrawn_uuid_root %}

The UUID of the transfer that was withdrawn from the sender's account. This is the root withdrawal, as certain transfers that utilize a Vault may have two subsequent Withdrawal events.

{% enddocs %}
101 changes: 77 additions & 24 deletions models/gold/core/core__ez_token_transfers.sql
Original file line number Diff line number Diff line change
@@ -1,29 +1,82 @@
{{ config(
materialized = 'view',
tags = ['ez', 'scheduled']
materialized = 'incremental',
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
cluster_by = ['block_timestamp::date', 'modified_timestamp::date'],
unique_key = "ez_token_transfers_id",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_id,sender,recipient,token_contract);",
tags = ['scheduled_non_core']
) }}

SELECT
block_height,
block_timestamp,
tx_id,
sender,
recipient,
token_contract,
amount,
tx_succeeded,
COALESCE (
token_transfers_id,
{{ dbt_utils.generate_surrogate_key(
['tx_id','sender', 'recipient','token_contract', 'amount']
) }}
) AS ez_token_transfers_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__token_transfers_s') }}
WITH pre_crescendo AS (

SELECT
block_height,
block_timestamp,
tx_id,
sender,
recipient,
token_contract,
amount,
tx_succeeded,
COALESCE (
token_transfers_id,
{{ dbt_utils.generate_surrogate_key(
['tx_id','sender', 'recipient','token_contract', 'amount']
) }}
) AS ez_token_transfers_id,
inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__token_transfers_s') }}
WHERE
token_contract NOT IN (
'A.c38aea683c0c4d38.ZelosAccountingToken',
'A.f1b97c06745f37ad.SwapPair'
)

{% if is_incremental() %}
AND modified_timestamp > (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
post_crescendo AS (
SELECT
block_height,
block_timestamp,
tx_id,
from_address AS sender,
to_address AS recipient,
token_contract,
amount_adj :: FLOAT AS amount,
tx_succeeded,
token_transfers_id AS ez_token_transfers_id,
inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__token_transfers') }}

{% if is_incremental() %}
WHERE
token_contract NOT IN (
'A.c38aea683c0c4d38.ZelosAccountingToken',
'A.f1b97c06745f37ad.SwapPair'
modified_timestamp > (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
*
FROM
pre_crescendo
UNION ALL
SELECT
*
FROM
post_crescendo
19 changes: 8 additions & 11 deletions models/gold/core/core__ez_token_transfers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,11 @@ models:
- name: core__ez_token_transfers
description: |-
This table records all token transfers on the FLOW blockchain.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_id
- sender
- recipient
- token_contract
- amount
config:
severity: warn
- dbt_utils.recency:
datepart: hour
field: block_timestamp
interval: 1

columns:
- name: TX_ID
Expand Down Expand Up @@ -45,7 +39,6 @@ models:
- name: SENDER
description: "{{ doc('sender') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
Expand Down Expand Up @@ -87,6 +80,10 @@ models:

- name: EZ_TOKEN_TRANSFERS_ID
description: "{{ doc('pk_id') }}"
tests:
- not_null
- unique:
where: block_height >= 85981726

- name: INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"
Expand Down
196 changes: 196 additions & 0 deletions models/silver/transfers/silver__token_transfers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
cluster_by = ['block_timestamp::date', 'modified_timestamp::date'],
unique_key = "token_transfers_id",
tags = ['scheduled_non_core']
) }}

WITH events AS (

SELECT
block_height,
block_timestamp,
tx_id,
tx_succeeded,
event_index,
event_type,
event_contract,
event_data,
_inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
WHERE
_partition_by_block_id >= 85000000
AND block_height >= 85981726
AND (
(
event_contract = 'A.f233dcee88fe0abe.FungibleToken'
AND event_type IN (
'Withdrawn',
'Deposited'
)
)
OR (
-- no initial "Withdrawal" event if it's a new token mint
-- and contract will be the token minted, not the new FT contract
event_type IN ('TokensMinted')
)
)

{% if is_incremental() %}
AND modified_timestamp > (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
withdrawn AS (
SELECT
tx_id,
event_index,
event_contract,
event_data :amount :: STRING AS amount_adj,
event_data :balanceAfter :: STRING AS balance_after_adj,
event_data :from :: STRING AS from_address,
event_data :fromUUID :: STRING AS from_uuid,
event_data :type :: STRING AS token_type,
event_data :withdrawnUUID :: STRING AS withdrawn_uuid,
_inserted_timestamp
FROM
events
WHERE
event_type = 'Withdrawn'
),
deposited AS (
SELECT
tx_id,
event_index,
block_height,
block_timestamp,
event_contract,
event_data :amount :: STRING AS amount_adj,
event_data :balanceAfter :: STRING AS balance_after_adj,
event_data :depositedUUID :: STRING AS deposited_uuid,
event_data :to :: STRING AS to_address,
event_data :toUUID :: STRING AS to_uuid,
event_data :type :: STRING AS token_type,
ROW_NUMBER() over (
PARTITION BY tx_id
ORDER BY
event_index
) AS rn,
tx_succeeded,
_inserted_timestamp
FROM
events
WHERE
event_type = 'Deposited'
),
minted AS (
SELECT
tx_id,
event_index,
event_contract AS token_type,
event_data :amount :: STRING AS amount_adj,
COALESCE(
event_data :from :: STRING,
event_data :type :: STRING
) AS from_address,
'-1' AS withdrawn_uuid,
ROW_NUMBER() over (
PARTITION BY tx_id
ORDER BY
event_index
) AS rn
FROM
events
WHERE
event_type = 'TokensMinted'
),
FINAL AS (
SELECT
COALESCE(
d2.deposited_uuid,
d.deposited_uuid
) AS deposited_uuid_root,
COALESCE(
w2.withdrawn_uuid,
w.withdrawn_uuid,
m.withdrawn_uuid
) AS withdrawn_uuid_root,
d.tx_id,
d.block_height,
d.block_timestamp,
REGEXP_REPLACE(
COALESCE(
d.token_type,
d2.token_type,
w2.token_type,
w.token_type,
m.token_type
),
'\.Vault$',
''
) AS token_contract,
COALESCE(
w2.from_address,
w.from_address,
m.from_address
) AS from_address,
COALESCE(
d2.to_address,
d.to_address
) AS to_address,
COALESCE(
d.amount_adj,
d2.amount_adj,
w2.amount_adj,
w.amount_adj,
m.amount_adj
) AS amount_adj,
COALESCE(
w2.balance_after_adj,
w.balance_after_adj
) AS from_address_balance_after,
COALESCE(
d2.balance_after_adj,
d.balance_after_adj
) AS to_address_balance_after,
d.to_uuid = '0' AS is_fee_transfer,
d.tx_succeeded,
d._inserted_timestamp
FROM
deposited d
LEFT JOIN deposited d2
ON d.tx_id = d2.tx_id
AND d.to_uuid = d2.deposited_uuid
LEFT JOIN withdrawn w
ON d.tx_id = w.tx_id
AND d.deposited_uuid = w.withdrawn_uuid
LEFT JOIN withdrawn w2
ON d.tx_id = w2.tx_id
AND w.from_uuid = w2.withdrawn_uuid
LEFT JOIN minted m
ON d.tx_id = m.tx_id
AND d.amount_adj = m.amount_adj
AND d.rn = m.rn
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['tx_id', 'deposited_uuid_root', 'withdrawn_uuid_root']
) }} AS token_transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL
qualify(ROW_NUMBER() over (PARTITION BY tx_id, deposited_uuid_root
ORDER BY
from_address = 'null')) = 1
Loading

0 comments on commit 96755a0

Please sign in to comment.