diff --git a/README.md b/README.md index 3c1c9c66..987137ad 100644 --- a/README.md +++ b/README.md @@ -35,35 +35,43 @@ gnosis: query_tag: ``` -### Variables +## Variables To control the creation of UDF or SP macros with dbt run: * UPDATE_UDFS_AND_SPS -When True, executes all macros included in the on-run-start hooks within dbt_project.yml on model run as normal -When False, none of the on-run-start macros are executed on model run + * Default values are False + * When True, executes all macros included in the on-run-start hooks within dbt_project.yml on model run as normal + * When False, none of the on-run-start macros are executed on model run -Default values are False + * Usage: `dbt run --vars '{"UPDATE_UDFS_AND_SPS":True}' -m ...` -* Usage: -dbt run --var '{"UPDATE_UDFS_AND_SPS":True}' -m ... +Use a variable to heal a model incrementally: +* HEAL_MODEL + * Default is FALSE (Boolean) + * When FALSE, logic will be negated + * When TRUE, heal logic will apply + * Include `heal` in model tags within the config block for inclusion in the `dbt_run_heal_models` workflow, e.g. `tags = 'heal'` -To reload records in a curated complete table without a full-refresh, such as `silver_bridge.complete_bridge_activity`: -* HEAL_CURATED_MODEL -Default is an empty array [] -When item is included in var array [], incremental logic will be skipped for that CTE / code block -When item is not included in var array [] or does not match specified item in model, incremental logic will apply -Example set up: `{% if is_incremental() and 'axelar' not in var('HEAL_CURATED_MODEL') %}` + * Usage: `dbt run --vars '{"HEAL_MODEL":True}' -m ...` -* Usage: -Single CTE: dbt run --var '{"HEAL_CURATED_MODEL":"axelar"}' -m ... -Multiple CTEs: dbt run --var '{"HEAL_CURATED_MODEL":["axelar","across","celer_cbridge"]}' -m ... +Use a variable to negate incremental logic: +* Example use case: reload records in a curated complete table without a full-refresh, such as `silver_bridge.complete_bridge_activity`: +* HEAL_MODELS + * Default is an empty array [] + * When item is included in var array [], incremental logic will be skipped for that CTE / code block + * When item is not included in var array [] or does not match specified item in model, incremental logic will apply + * Example set up: `{% if is_incremental() and 'axelar' not in var('HEAL_MODELS') %}` -### Resources: -- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction) -- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers -- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support -- Find [dbt events](https://events.getdbt.com) near you -- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices + * Usage: + * Single CTE: `dbt run --vars '{"HEAL_MODELS":"axelar"}' -m ...` + * Multiple CTEs: `dbt run --vars '{"HEAL_MODELS":["axelar","across","celer_cbridge"]}' -m ...` + +Use a variable to extend the incremental lookback period: +* LOOKBACK + * Default is a string representing the specified time interval e.g. '12 hours', '7 days' etc. + * Example set up: `SELECT MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'` + + * Usage: `dbt run --vars '{"LOOKBACK":"36 hours"}' -m ...` ## Applying Model Tags @@ -97,7 +105,7 @@ To add/update a model's snowflake tags, add/modify the `meta` model property und By default, model tags are pushed to Snowflake on each load. You can disable this by setting the `UPDATE_SNOWFLAKE_TAGS` project variable to `False` during a run. ``` -dbt run --var '{"UPDATE_SNOWFLAKE_TAGS":False}' -s models/core/core__fact_blocks.sql +dbt run --vars '{"UPDATE_SNOWFLAKE_TAGS":False}' -s models/core/core__fact_blocks.sql ``` ### Querying for existing tags on a model in snowflake @@ -105,4 +113,11 @@ dbt run --var '{"UPDATE_SNOWFLAKE_TAGS":False}' -s models/core/core__fact_blocks ``` select * from table(gnosis.information_schema.tag_references('gnosis.core.fact_blocks', 'table')); -``` \ No newline at end of file +``` + +### Resources: +- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction) +- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers +- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support +- Find [dbt events](https://events.getdbt.com) near you +- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices \ No newline at end of file diff --git a/dbt_project.yml b/dbt_project.yml index 7caea689..2b53afe9 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -66,7 +66,7 @@ vars: WAIT: 0 OBSERV_FULL_TEST: False HEAL_MODEL: False - HEAL_CURATED_MODEL: [] + HEAL_MODELS: [] START_GHA_TASKS: False API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}' EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}' diff --git a/models/gold/defi/defi__ez_bridge_activity.sql b/models/gold/defi/defi__ez_bridge_activity.sql index b214d77c..cddb0545 100644 --- a/models/gold/defi/defi__ez_bridge_activity.sql +++ b/models/gold/defi/defi__ez_bridge_activity.sql @@ -33,7 +33,13 @@ SELECT token_symbol, amount_unadj, amount, - amount_usd, + ROUND( + CASE + WHEN amount_usd < 1e+15 THEN amount_usd + ELSE NULL + END, + 2 + ) AS amount_usd, COALESCE ( complete_bridge_activity_id, {{ dbt_utils.generate_surrogate_key( diff --git a/models/gold/defi/defi__ez_dex_swaps.sql b/models/gold/defi/defi__ez_dex_swaps.sql index 5eb1d72e..f076973a 100644 --- a/models/gold/defi/defi__ez_dex_swaps.sql +++ b/models/gold/defi/defi__ez_dex_swaps.sql @@ -24,10 +24,26 @@ SELECT event_name, amount_in_unadj, amount_in, - amount_in_usd, + ROUND( + CASE + WHEN amount_out_usd IS NULL + OR ABS((amount_in_usd - amount_out_usd) / NULLIF(amount_out_usd, 0)) > 0.75 + OR ABS((amount_in_usd - amount_out_usd) / NULLIF(amount_in_usd, 0)) > 0.75 THEN NULL + ELSE amount_in_usd + END, + 2 + ) AS amount_in_usd, amount_out_unadj, amount_out, - amount_out_usd, + ROUND( + CASE + WHEN amount_in_usd IS NULL + OR ABS((amount_out_usd - amount_in_usd) / NULLIF(amount_in_usd, 0)) > 0.75 + OR ABS((amount_out_usd - amount_in_usd) / NULLIF(amount_out_usd, 0)) > 0.75 THEN NULL + ELSE amount_out_usd + END, + 2 + ) AS amount_out_usd, sender, tx_to, event_index, diff --git a/models/silver/core/silver__transfers.sql b/models/silver/core/silver__transfers.sql index 7db266f5..81ad73c8 100644 --- a/models/silver/core/silver__transfers.sql +++ b/models/silver/core/silver__transfers.sql @@ -35,7 +35,7 @@ AND _inserted_timestamp >= ( SELECT MAX( _inserted_timestamp - ) - INTERVAL '36 hours' + ) - INTERVAL '{{ var('LOOKBACK', '4 hours') }}' FROM {{ this }} ) @@ -169,7 +169,7 @@ heal_model AS ( SELECT MAX( _inserted_timestamp - ) - INTERVAL '36 hours' + ) - INTERVAL '{{ var('LOOKBACK', '4 hours') }}' FROM {{ this }} ) @@ -195,7 +195,7 @@ heal_model AS ( SELECT MAX( _inserted_timestamp - ) - INTERVAL '36 hours' + ) - INTERVAL '{{ var('LOOKBACK', '4 hours') }}' FROM {{ this }} ) diff --git a/models/silver/defi/bridge/celer/silver_bridge__celer_cbridge_send.sql b/models/silver/defi/bridge/celer/silver_bridge__celer_cbridge_send.sql index 1672bc45..4702a1a9 100644 --- a/models/silver/defi/bridge/celer/silver_bridge__celer_cbridge_send.sql +++ b/models/silver/defi/bridge/celer/silver_bridge__celer_cbridge_send.sql @@ -46,6 +46,7 @@ WITH base_evt AS ( WHERE topics [0] :: STRING = '0x89d8051e597ab4178a863a5190407b98abfeff406aa8db90c59af76612e58f01' AND contract_address = '0x3795c36e7d12a8c252a20c5a7b455f7c57b60283' + AND tx_status = 'SUCCESS' {% if is_incremental() %} AND _inserted_timestamp >= ( diff --git a/models/silver/defi/bridge/hop/silver_bridge__hop_ammwrapper.sql b/models/silver/defi/bridge/hop/silver_bridge__hop_ammwrapper.sql index f4aafc45..12ef7a8c 100644 --- a/models/silver/defi/bridge/hop/silver_bridge__hop_ammwrapper.sql +++ b/models/silver/defi/bridge/hop/silver_bridge__hop_ammwrapper.sql @@ -14,6 +14,7 @@ WITH base_contracts AS ( {{ ref('silver__logs') }} WHERE topics [0] :: STRING = '0xe35dddd4ea75d7e9b3fe93af4f4e40e778c3da4074c9d93e7c6536f1e803c1eb' + AND tx_status = 'SUCCESS' {% if is_incremental() %} AND _inserted_timestamp >= ( diff --git a/models/silver/defi/bridge/hop/silver_bridge__hop_transfersent.sql b/models/silver/defi/bridge/hop/silver_bridge__hop_transfersent.sql index c782709d..acfb4094 100644 --- a/models/silver/defi/bridge/hop/silver_bridge__hop_transfersent.sql +++ b/models/silver/defi/bridge/hop/silver_bridge__hop_transfersent.sql @@ -51,6 +51,7 @@ WITH base_evt AS ( WHERE topics [0] :: STRING = '0xe35dddd4ea75d7e9b3fe93af4f4e40e778c3da4074c9d93e7c6536f1e803c1eb' AND origin_to_address IS NOT NULL + AND tx_status = 'SUCCESS' {% if is_incremental() %} AND _inserted_timestamp >= ( diff --git a/models/silver/defi/bridge/meson/silver_bridge__meson_transfers.sql b/models/silver/defi/bridge/meson/silver_bridge__meson_transfers.sql index ef883939..f6854264 100644 --- a/models/silver/defi/bridge/meson/silver_bridge__meson_transfers.sql +++ b/models/silver/defi/bridge/meson/silver_bridge__meson_transfers.sql @@ -126,6 +126,7 @@ dst_info AS ( WHERE contract_address = '0x25ab3efd52e6470681ce037cd546dc60726948d3' AND topics [0] :: STRING = '0x5ce4019f772fda6cb703b26bce3ec3006eb36b73f1d3a0eb441213317d9f5e9d' + AND tx_status = 'SUCCESS' {% if is_incremental() %} AND _inserted_timestamp >= ( diff --git a/models/silver/defi/bridge/silver_bridge__complete_bridge_activity.sql b/models/silver/defi/bridge/silver_bridge__complete_bridge_activity.sql index 259a3de9..27145c1e 100644 --- a/models/silver/defi/bridge/silver_bridge__complete_bridge_activity.sql +++ b/models/silver/defi/bridge/silver_bridge__complete_bridge_activity.sql @@ -1,12 +1,14 @@ +-- depends_on: {{ ref('silver__complete_token_prices') }} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', unique_key = ['block_number','platform','version'], cluster_by = ['block_timestamp::DATE'], - tags = ['curated','reorg'] + tags = ['curated','reorg','heal'] ) }} WITH celer_cbridge AS ( + SELECT block_number, block_timestamp, @@ -31,11 +33,11 @@ WITH celer_cbridge AS ( FROM {{ ref('silver_bridge__celer_cbridge_send') }} -{% if is_incremental() and 'celer_cbridge' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'celer_cbridge' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -66,11 +68,11 @@ hop AS ( FROM {{ ref('silver_bridge__hop_transfersent') }} -{% if is_incremental() and 'hop' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'hop' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -101,11 +103,11 @@ meson AS ( FROM {{ ref('silver_bridge__meson_transfers') }} -{% if is_incremental() and 'meson' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'meson' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -127,7 +129,7 @@ all_protocols AS ( FROM meson ), -FINAL AS ( +complete_bridge_activity AS ( SELECT block_number, block_timestamp, @@ -144,14 +146,12 @@ FINAL AS ( receiver, destination_chain_receiver, CASE - WHEN platform = 'meson' - THEN destination_chain_id :: STRING + WHEN platform = 'meson' THEN destination_chain_id :: STRING WHEN d.chain_id IS NULL THEN destination_chain_id :: STRING ELSE d.chain_id :: STRING END AS destination_chain_id, CASE - WHEN platform = 'meson' - THEN LOWER(destination_chain) + WHEN platform = 'meson' THEN LOWER(destination_chain) WHEN d.chain IS NULL THEN LOWER(destination_chain) ELSE LOWER( d.chain @@ -171,7 +171,7 @@ FINAL AS ( 2 ) ELSE NULL - END AS amount_usd_unadj, + END AS amount_usd, _id, b._inserted_timestamp FROM @@ -196,6 +196,183 @@ FINAL AS ( ) = LOWER( b.destination_chain ) +), + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +heal_model AS ( + SELECT + block_number, + block_timestamp, + origin_from_address, + origin_to_address, + origin_function_signature, + tx_hash, + event_index, + bridge_address, + event_name, + platform, + version, + sender, + receiver, + destination_chain_receiver, + destination_chain_id, + destination_chain, + t0.token_address, + C.token_symbol AS token_symbol, + C.token_decimals AS token_decimals, + amount_unadj, + CASE + WHEN C.token_decimals IS NOT NULL THEN (amount_unadj / pow(10, C.token_decimals)) + ELSE amount_unadj + END AS amount_heal, + CASE + WHEN C.token_decimals IS NOT NULL THEN amount_heal * p.price + ELSE NULL + END AS amount_usd_heal, + _id, + t0._inserted_timestamp + FROM + {{ this }} + t0 + LEFT JOIN {{ ref('silver__contracts') }} C + ON t0.token_address = C.contract_address + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p + ON t0.token_address = p.token_address + AND DATE_TRUNC( + 'hour', + block_timestamp + ) = p.hour + WHERE + CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t1.block_number, + '-', + t1.platform, + '-', + t1.version + ) + FROM + {{ this }} + t1 + WHERE + t1.token_decimals IS NULL + AND t1._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t1.token_address) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t2.block_number, + '-', + t2.platform, + '-', + t2.version + ) + FROM + {{ this }} + t2 + WHERE + t2.amount_usd IS NULL + AND t2._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__complete_token_prices') }} + p + WHERE + p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND p.price IS NOT NULL + AND p.token_address = t2.token_address + AND p.hour = DATE_TRUNC( + 'hour', + t2.block_timestamp + ) + ) + GROUP BY + 1 + ) + ), + {% endif %} + + FINAL AS ( + SELECT + * + FROM + complete_bridge_activity + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +UNION ALL +SELECT + block_number, + block_timestamp, + origin_from_address, + origin_to_address, + origin_function_signature, + tx_hash, + event_index, + bridge_address, + event_name, + platform, + version, + sender, + receiver, + destination_chain_receiver, + destination_chain_id, + destination_chain, + token_address, + token_symbol, + token_decimals, + amount_unadj, + amount_heal AS amount, + amount_usd_heal AS amount_usd, + _id, + _inserted_timestamp +FROM + heal_model +{% endif %} ) SELECT block_number, @@ -219,21 +396,18 @@ SELECT token_decimals, amount_unadj, amount, - CASE - WHEN amount_usd_unadj < 1e+15 THEN amount_usd_unadj - ELSE NULL - END AS amount_usd, + amount_usd, _id, _inserted_timestamp, {{ dbt_utils.generate_surrogate_key( - ['_id'] + ['_id'] ) }} AS complete_bridge_activity_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id FROM - FINAL -WHERE destination_chain <> 'gnosis' -qualify (ROW_NUMBER() over (PARTITION BY _id + FINAL +WHERE + destination_chain <> 'gnosis' qualify (ROW_NUMBER() over (PARTITION BY _id ORDER BY _inserted_timestamp DESC)) = 1 diff --git a/models/silver/defi/dex/balancer/silver_dex__balancer_pools.sql b/models/silver/defi/dex/balancer/silver_dex__balancer_pools.sql index defdaa16..ebfeaa60 100644 --- a/models/silver/defi/dex/balancer/silver_dex__balancer_pools.sql +++ b/models/silver/defi/dex/balancer/silver_dex__balancer_pools.sql @@ -31,6 +31,7 @@ WITH pools_registered AS ( WHERE topics [0] :: STRING = '0x3c13bc30b8e878c53fd2a36b679409c073afd75950be43d8858768e956fbc20e' --PoolRegistered AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8' + AND tx_status = 'SUCCESS' {% if is_incremental() %} AND _inserted_timestamp >= ( @@ -72,6 +73,7 @@ tokens_registered AS ( FROM pools_registered ) + AND tx_status = 'SUCCESS' ), function_sigs AS ( SELECT diff --git a/models/silver/defi/dex/balancer/silver_dex__balancer_swaps.sql b/models/silver/defi/dex/balancer/silver_dex__balancer_swaps.sql index 8f56f858..eb02c3d4 100644 --- a/models/silver/defi/dex/balancer/silver_dex__balancer_swaps.sql +++ b/models/silver/defi/dex/balancer/silver_dex__balancer_swaps.sql @@ -54,6 +54,7 @@ swaps_base AS ( WHERE topics [0] :: STRING = '0x2170c741c41531aec20e7c107c24eecfdd15e69c9bb0a8dd37b1840b9e0b207b' AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8' + AND tx_status = 'SUCCESS' {% if is_incremental() %} AND _inserted_timestamp >= ( diff --git a/models/silver/defi/dex/curve/silver_dex__curve_pools.sql b/models/silver/defi/dex/curve/silver_dex__curve_pools.sql index c3a6de02..7909dff9 100644 --- a/models/silver/defi/dex/curve/silver_dex__curve_pools.sql +++ b/models/silver/defi/dex/curve/silver_dex__curve_pools.sql @@ -33,7 +33,8 @@ WITH contract_deployments AS ( '0xd19baeadc667cf2015e395f2b08668ef120f41f5' ) AND TYPE ILIKE 'create%' - AND tx_status ILIKE 'success' + AND tx_status = 'SUCCESS' + AND trace_status = 'SUCCESS' {% if is_incremental() %} AND _inserted_timestamp >= ( diff --git a/models/silver/defi/dex/curve/silver_dex__curve_swaps.sql b/models/silver/defi/dex/curve/silver_dex__curve_swaps.sql index 08111c66..470a767a 100644 --- a/models/silver/defi/dex/curve/silver_dex__curve_swaps.sql +++ b/models/silver/defi/dex/curve/silver_dex__curve_swaps.sql @@ -80,6 +80,7 @@ curve_base AS ( --TokenExchange '0xd013ca23e77a65003c2c659c5442c00c805371b7fc1ebd4c206c41d1536bd90b' --TokenExchangeUnderlying ) + AND tx_status = 'SUCCESS' {% if is_incremental() %} AND _inserted_timestamp >= ( @@ -148,6 +149,7 @@ token_transfers AS ( curve_base ) AND CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) <> '0x0000000000000000000000000000000000000000' + AND tx_status = 'SUCCESS' {% if is_incremental() %} AND _inserted_timestamp >= ( diff --git a/models/silver/defi/dex/honeyswap/silver_dex__honeyswap_pools.sql b/models/silver/defi/dex/honeyswap/silver_dex__honeyswap_pools.sql index 28c9cc19..d930d43e 100644 --- a/models/silver/defi/dex/honeyswap/silver_dex__honeyswap_pools.sql +++ b/models/silver/defi/dex/honeyswap/silver_dex__honeyswap_pools.sql @@ -27,6 +27,7 @@ WITH pool_creation AS ( WHERE contract_address = LOWER('0xA818b4F111Ccac7AA31D0BCc0806d64F2E0737D7') AND topics [0] :: STRING = '0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9' --PairCreated + AND tx_status = 'SUCCESS' {% if is_incremental() %} AND _inserted_timestamp >= ( diff --git a/models/silver/defi/dex/silver_dex__complete_dex_liquidity_pools.sql b/models/silver/defi/dex/silver_dex__complete_dex_liquidity_pools.sql index 7937a775..1bf5fd85 100644 --- a/models/silver/defi/dex/silver_dex__complete_dex_liquidity_pools.sql +++ b/models/silver/defi/dex/silver_dex__complete_dex_liquidity_pools.sql @@ -3,15 +3,15 @@ incremental_strategy = 'delete+insert', unique_key = ['block_number','platform','version'], cluster_by = ['block_timestamp::DATE'], - tags = ['curated','reorg'] + tags = ['curated','reorg','heal'] ) }} WITH contracts AS ( SELECT - contract_address AS address, - token_symbol AS symbol, - token_decimals AS decimals, + contract_address, + token_symbol, + token_decimals, _inserted_timestamp FROM {{ ref('silver__contracts') }} @@ -24,10 +24,8 @@ balancer AS ( contract_address, pool_address, pool_name, - 'balancer' AS platform, - 'v1' AS version, - _log_id AS _id, - _inserted_timestamp, + NULL AS fee, + NULL AS tick_spacing, token0, token1, token2, @@ -35,15 +33,19 @@ balancer AS ( token4, token5, token6, - token7 + token7, + 'balancer' AS platform, + 'v1' AS version, + _log_id AS _id, + _inserted_timestamp FROM {{ ref('silver_dex__balancer_pools') }} -{% if is_incremental() %} +{% if is_incremental() and 'balancer' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '12 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -57,10 +59,8 @@ curve AS ( deployer_address AS contract_address, pool_address, pool_name, - 'curve' AS platform, - 'v1' AS version, - _call_id AS _id, - _inserted_timestamp, + NULL AS fee, + NULL AS tick_spacing, MAX( CASE WHEN token_num = 1 THEN token_address @@ -100,15 +100,19 @@ curve AS ( CASE WHEN token_num = 8 THEN token_address END - ) AS token7 + ) AS token7, + 'curve' AS platform, + 'v1' AS version, + _call_id AS _id, + _inserted_timestamp FROM {{ ref('silver_dex__curve_pools') }} -{% if is_incremental() %} +{% if is_incremental() and 'curve' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '12 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -124,8 +128,16 @@ honeyswap AS ( contract_address, pool_address, NULL AS pool_name, + NULL AS fee, + NULL AS tick_spacing, token0, token1, + NULL AS token2, + NULL AS token3, + NULL AS token4, + NULL AS token5, + NULL AS token6, + NULL AS token7, 'honeyswap' AS platform, 'v1' AS version, _log_id AS _id, @@ -133,11 +145,11 @@ honeyswap AS ( FROM {{ ref('silver_dex__honeyswap_pools') }} -{% if is_incremental() %} +{% if is_incremental() and 'honeyswap' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '12 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -151,8 +163,16 @@ swapr AS ( contract_address, pool_address, NULL AS pool_name, + NULL AS fee, + NULL AS tick_spacing, token0, token1, + NULL AS token2, + NULL AS token3, + NULL AS token4, + NULL AS token5, + NULL AS token6, + NULL AS token7, 'swapr' AS platform, 'v1' AS version, _log_id AS _id, @@ -160,11 +180,11 @@ swapr AS ( FROM {{ ref('silver_dex__swapr_pools') }} -{% if is_incremental() %} +{% if is_incremental() and 'swapr' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '12 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -178,8 +198,16 @@ sushi AS ( contract_address, pool_address, NULL AS pool_name, + NULL AS fee, + NULL AS tick_spacing, token0, token1, + NULL AS token2, + NULL AS token3, + NULL AS token4, + NULL AS token5, + NULL AS token6, + NULL AS token7, 'sushiswap' AS platform, 'v1' AS version, _log_id AS _id, @@ -187,17 +215,17 @@ sushi AS ( FROM {{ ref('silver_dex__sushi_pools') }} -{% if is_incremental() %} +{% if is_incremental() and 'sushi' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '12 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) {% endif %} ), -all_pools_standard AS ( +all_pools AS ( SELECT * FROM @@ -212,8 +240,7 @@ all_pools_standard AS ( * FROM sushi -), -all_pools_other AS ( + UNION ALL SELECT * FROM @@ -224,172 +251,641 @@ all_pools_other AS ( FROM curve ), -FINAL AS ( +--when pool_name is null and balancer,curve +complete_lps AS ( SELECT block_number, block_timestamp, tx_hash, - contract_address, + p.contract_address, pool_address, CASE - WHEN pool_name IS NULL THEN CONCAT( + WHEN pool_name IS NOT NULL THEN pool_name + WHEN pool_name IS NULL + AND platform IN ( + 'balancer', + 'curve' + ) THEN CONCAT( + COALESCE(c0.token_symbol, SUBSTRING(token0, 1, 5) || '...' || SUBSTRING(token0, 39, 42)), + CASE + WHEN token1 IS NOT NULL THEN '-' || COALESCE(c1.token_symbol, SUBSTRING(token1, 1, 5) || '...' || SUBSTRING(token1, 39, 42)) + ELSE '' + END, + CASE + WHEN token2 IS NOT NULL THEN '-' || COALESCE(c2.token_symbol, SUBSTRING(token2, 1, 5) || '...' || SUBSTRING(token2, 39, 42)) + ELSE '' + END, + CASE + WHEN token3 IS NOT NULL THEN '-' || COALESCE(c3.token_symbol, SUBSTRING(token3, 1, 5) || '...' || SUBSTRING(token3, 39, 42)) + ELSE '' + END, + CASE + WHEN token4 IS NOT NULL THEN '-' || COALESCE(c4.token_symbol, SUBSTRING(token4, 1, 5) || '...' || SUBSTRING(token4, 39, 42)) + ELSE '' + END, + CASE + WHEN token5 IS NOT NULL THEN '-' || COALESCE(c5.token_symbol, SUBSTRING(token5, 1, 5) || '...' || SUBSTRING(token5, 39, 42)) + ELSE '' + END, + CASE + WHEN token6 IS NOT NULL THEN '-' || COALESCE(c6.token_symbol, SUBSTRING(token6, 1, 5) || '...' || SUBSTRING(token6, 39, 42)) + ELSE '' + END, + CASE + WHEN token7 IS NOT NULL THEN '-' || COALESCE(c7.token_symbol, SUBSTRING(token7, 1, 5) || '...' || SUBSTRING(token7, 39, 42)) + ELSE '' + END + ) + ELSE CONCAT( COALESCE( - c0.symbol, + c0.token_symbol, CONCAT(SUBSTRING(token0, 1, 5), '...', SUBSTRING(token0, 39, 42)) ), '-', COALESCE( - c1.symbol, + c1.token_symbol, CONCAT(SUBSTRING(token1, 1, 5), '...', SUBSTRING(token1, 39, 42)) ) ) - ELSE pool_name END AS pool_name, + fee, + tick_spacing, + token0, + token1, + token2, + token3, + token4, + token5, + token6, + token7, OBJECT_CONSTRUCT( 'token0', token0, 'token1', - token1 + token1, + 'token2', + token2, + 'token3', + token3, + 'token4', + token4, + 'token5', + token5, + 'token6', + token6, + 'token7', + token7 ) AS tokens, OBJECT_CONSTRUCT( 'token0', - c0.symbol, + c0.token_symbol, 'token1', - c1.symbol + c1.token_symbol, + 'token2', + c2.token_symbol, + 'token3', + c3.token_symbol, + 'token4', + c4.token_symbol, + 'token5', + c5.token_symbol, + 'token6', + c6.token_symbol, + 'token7', + c7.token_symbol ) AS symbols, OBJECT_CONSTRUCT( 'token0', - c0.decimals, + c0.token_decimals, 'token1', - c1.decimals + c1.token_decimals, + 'token2', + c2.token_decimals, + 'token3', + c3.token_decimals, + 'token4', + c4.token_decimals, + 'token5', + c5.token_decimals, + 'token6', + c6.token_decimals, + 'token7', + c7.token_decimals ) AS decimals, platform, version, _id, p._inserted_timestamp FROM - all_pools_standard p + all_pools p LEFT JOIN contracts c0 - ON c0.address = p.token0 + ON c0.contract_address = p.token0 LEFT JOIN contracts c1 - ON c1.address = p.token1 - UNION ALL + ON c1.contract_address = p.token1 + LEFT JOIN contracts c2 + ON c2.contract_address = p.token2 + LEFT JOIN contracts c3 + ON c3.contract_address = p.token3 + LEFT JOIN contracts c4 + ON c4.contract_address = p.token4 + LEFT JOIN contracts c5 + ON c5.contract_address = p.token5 + LEFT JOIN contracts c6 + ON c6.contract_address = p.token6 + LEFT JOIN contracts c7 + ON c7.contract_address = p.token7 +), + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +heal_model AS ( SELECT block_number, block_timestamp, tx_hash, - contract_address, + t0.contract_address, pool_address, CASE - WHEN pool_name IS NULL THEN CONCAT( - COALESCE(c0.symbol, SUBSTRING(token0, 1, 5) || '...' || SUBSTRING(token0, 39, 42)), + WHEN pool_name IS NOT NULL THEN pool_name + WHEN pool_name IS NULL + AND platform IN ( + 'balancer', + 'curve' + ) THEN CONCAT( + COALESCE(c0.token_symbol, SUBSTRING(token0, 1, 5) || '...' || SUBSTRING(token0, 39, 42)), CASE - WHEN token1 IS NOT NULL THEN '-' || COALESCE(c1.symbol, SUBSTRING(token1, 1, 5) || '...' || SUBSTRING(token1, 39, 42)) + WHEN token1 IS NOT NULL THEN '-' || COALESCE(c1.token_symbol, SUBSTRING(token1, 1, 5) || '...' || SUBSTRING(token1, 39, 42)) ELSE '' END, CASE - WHEN token2 IS NOT NULL THEN '-' || COALESCE(c2.symbol, SUBSTRING(token2, 1, 5) || '...' || SUBSTRING(token2, 39, 42)) + WHEN token2 IS NOT NULL THEN '-' || COALESCE(c2.token_symbol, SUBSTRING(token2, 1, 5) || '...' || SUBSTRING(token2, 39, 42)) ELSE '' END, CASE - WHEN token3 IS NOT NULL THEN '-' || COALESCE(c3.symbol, SUBSTRING(token3, 1, 5) || '...' || SUBSTRING(token3, 39, 42)) + WHEN token3 IS NOT NULL THEN '-' || COALESCE(c3.token_symbol, SUBSTRING(token3, 1, 5) || '...' || SUBSTRING(token3, 39, 42)) ELSE '' END, CASE - WHEN token4 IS NOT NULL THEN '-' || COALESCE(c4.symbol, SUBSTRING(token4, 1, 5) || '...' || SUBSTRING(token4, 39, 42)) + WHEN token4 IS NOT NULL THEN '-' || COALESCE(c4.token_symbol, SUBSTRING(token4, 1, 5) || '...' || SUBSTRING(token4, 39, 42)) ELSE '' END, CASE - WHEN token5 IS NOT NULL THEN '-' || COALESCE(c5.symbol, SUBSTRING(token5, 1, 5) || '...' || SUBSTRING(token5, 39, 42)) + WHEN token5 IS NOT NULL THEN '-' || COALESCE(c5.token_symbol, SUBSTRING(token5, 1, 5) || '...' || SUBSTRING(token5, 39, 42)) ELSE '' END, CASE - WHEN token6 IS NOT NULL THEN '-' || COALESCE(c6.symbol, SUBSTRING(token6, 1, 5) || '...' || SUBSTRING(token6, 39, 42)) + WHEN token6 IS NOT NULL THEN '-' || COALESCE(c6.token_symbol, SUBSTRING(token6, 1, 5) || '...' || SUBSTRING(token6, 39, 42)) ELSE '' END, CASE - WHEN token7 IS NOT NULL THEN '-' || COALESCE(c7.symbol, SUBSTRING(token7, 1, 5) || '...' || SUBSTRING(token7, 39, 42)) + WHEN token7 IS NOT NULL THEN '-' || COALESCE(c7.token_symbol, SUBSTRING(token7, 1, 5) || '...' || SUBSTRING(token7, 39, 42)) ELSE '' END ) - ELSE pool_name - END AS pool_name, - OBJECT_CONSTRUCT( - 'token0', - token0, - 'token1', - token1, - 'token2', - token2, - 'token3', - token3, - 'token4', - token4, - 'token5', - token5, - 'token6', - token6, - 'token7', - token7 - ) AS tokens, + ELSE CONCAT( + COALESCE( + c0.token_symbol, + CONCAT(SUBSTRING(token0, 1, 5), '...', SUBSTRING(token0, 39, 42)) + ), + '-', + COALESCE( + c1.token_symbol, + CONCAT(SUBSTRING(token1, 1, 5), '...', SUBSTRING(token1, 39, 42)) + ) + ) + END AS pool_name_heal, + fee, + tick_spacing, + token0, + token1, + token2, + token3, + token4, + token5, + token6, + token7, + tokens, OBJECT_CONSTRUCT( 'token0', - c0.symbol, + c0.token_symbol, 'token1', - c1.symbol, + c1.token_symbol, 'token2', - c2.symbol, + c2.token_symbol, 'token3', - c3.symbol, + c3.token_symbol, 'token4', - c4.symbol, + c4.token_symbol, 'token5', - c5.symbol, + c5.token_symbol, 'token6', - c6.symbol, + c6.token_symbol, 'token7', - c7.symbol - ) AS symbols, + c7.token_symbol + ) AS symbols_heal, OBJECT_CONSTRUCT( 'token0', - c0.decimals, + c0.token_decimals, 'token1', - c1.decimals, + c1.token_decimals, 'token2', - c2.decimals, + c2.token_decimals, 'token3', - c3.decimals, + c3.token_decimals, 'token4', - c4.decimals, + c4.token_decimals, 'token5', - c5.decimals, + c5.token_decimals, 'token6', - c6.decimals, + c6.token_decimals, 'token7', - c7.decimals - ) AS decimals, + c7.token_decimals + ) AS decimals_heal, platform, version, _id, - p._inserted_timestamp + t0._inserted_timestamp FROM - all_pools_other p + {{ this }} + t0 LEFT JOIN contracts c0 - ON c0.address = p.token0 + ON c0.contract_address = t0.token0 LEFT JOIN contracts c1 - ON c1.address = p.token1 + ON c1.contract_address = t0.token1 LEFT JOIN contracts c2 - ON c2.address = p.token2 + ON c2.contract_address = t0.token2 LEFT JOIN contracts c3 - ON c3.address = p.token3 + ON c3.contract_address = t0.token3 LEFT JOIN contracts c4 - ON c4.address = p.token4 + ON c4.contract_address = t0.token4 LEFT JOIN contracts c5 - ON c5.address = p.token5 + ON c5.contract_address = t0.token5 LEFT JOIN contracts c6 - ON c6.address = p.token6 + ON c6.contract_address = t0.token6 LEFT JOIN contracts c7 - ON c7.address = p.token7 + ON c7.contract_address = t0.token7 + WHERE + CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t1.block_number, + '-', + t1.platform, + '-', + t1.version + ) + FROM + {{ this }} + t1 + WHERE + t1.decimals :token0 :: INT IS NULL + AND t1._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t1.tokens :token0 :: STRING) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t2.block_number, + '-', + t2.platform, + '-', + t2.version + ) + FROM + {{ this }} + t2 + WHERE + t2.decimals :token1 :: INT IS NULL + AND t2._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t2.tokens :token1 :: STRING) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t3.block_number, + '-', + t3.platform, + '-', + t3.version + ) + FROM + {{ this }} + t3 + WHERE + t3.decimals :token2 :: INT IS NULL + AND t3._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t3.tokens :token2 :: STRING) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t4.block_number, + '-', + t4.platform, + '-', + t4.version + ) + FROM + {{ this }} + t4 + WHERE + t4.decimals :token3 :: INT IS NULL + AND t4._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t4.tokens :token3 :: STRING) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t5.block_number, + '-', + t5.platform, + '-', + t5.version + ) + FROM + {{ this }} + t5 + WHERE + t5.decimals :token4 :: INT IS NULL + AND t5._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t5.tokens :token4 :: STRING) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t6.block_number, + '-', + t6.platform, + '-', + t6.version + ) + FROM + {{ this }} + t6 + WHERE + t6.decimals :token5 :: INT IS NULL + AND t6._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t6.tokens :token5 :: STRING) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t7.block_number, + '-', + t7.platform, + '-', + t7.version + ) + FROM + {{ this }} + t7 + WHERE + t7.decimals :token6 :: INT IS NULL + AND t7._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t7.tokens :token6 :: STRING) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t8.block_number, + '-', + t8.platform, + '-', + t8.version + ) + FROM + {{ this }} + t8 + WHERE + t8.decimals :token7 :: INT IS NULL + AND t8._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t8.tokens :token7 :: STRING) + GROUP BY + 1 + ) + ), + {% endif %} + + FINAL AS ( + SELECT + * + FROM + complete_lps + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +UNION ALL +SELECT + block_number, + block_timestamp, + tx_hash, + contract_address, + pool_address, + pool_name_heal AS pool_name, + fee, + tick_spacing, + token0, + token1, + token2, + token3, + token4, + token5, + token6, + token7, + tokens, + symbols_heal AS symbols, + decimals_heal AS decimals, + platform, + version, + _id, + _inserted_timestamp +FROM + heal_model +{% endif %} ) SELECT block_number, @@ -403,6 +899,16 @@ SELECT tokens, symbols, decimals, + fee, + tick_spacing, + token0, + token1, + token2, + token3, + token4, + token5, + token6, + token7, _id, _inserted_timestamp, {{ dbt_utils.generate_surrogate_key( diff --git a/models/silver/defi/dex/silver_dex__complete_dex_swaps.sql b/models/silver/defi/dex/silver_dex__complete_dex_swaps.sql index ccd470a2..8a9a1079 100644 --- a/models/silver/defi/dex/silver_dex__complete_dex_swaps.sql +++ b/models/silver/defi/dex/silver_dex__complete_dex_swaps.sql @@ -1,37 +1,14 @@ +-- depends_on: {{ ref('silver__complete_token_prices') }} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', unique_key = ['block_number','platform','version'], cluster_by = ['block_timestamp::DATE'], - tags = ['curated','reorg'] + tags = ['curated','reorg','heal'] ) }} -WITH contracts AS ( +WITH sushi AS ( - SELECT - contract_address AS address, - token_symbol AS symbol, - token_name AS NAME, - token_decimals AS decimals - FROM - {{ ref('silver__contracts') }} -), -prices AS ( - SELECT - HOUR, - token_address, - price - FROM - {{ ref('price__ez_prices_hourly') }} - WHERE - token_address IN ( - SELECT - DISTINCT address - FROM - contracts - ) -), -sushi_swaps AS ( SELECT block_number, block_timestamp, @@ -41,49 +18,31 @@ sushi_swaps AS ( origin_to_address, contract_address, event_name, - c1.decimals AS decimals_in, - c1.symbol AS symbol_in, amount_in_unadj, - CASE - WHEN decimals_in IS NULL THEN amount_in_unadj - ELSE (amount_in_unadj / pow(10, decimals_in)) - END AS amount_in, - c2.decimals AS decimals_out, - c2.symbol AS symbol_out, amount_out_unadj, - CASE - WHEN decimals_out IS NULL THEN amount_out_unadj - ELSE (amount_out_unadj / pow(10, decimals_out)) - END AS amount_out, + token_in, + token_out, sender, tx_to, event_index, platform, 'v1' AS version, - token_in, - token_out, - NULL AS pool_name, _log_id, _inserted_timestamp FROM {{ ref('silver_dex__sushi_swaps') }} - s - LEFT JOIN contracts c1 - ON s.token_in = c1.address - LEFT JOIN contracts c2 - ON s.token_out = c2.address -{% if is_incremental() %} +{% if is_incremental() and 'sushi' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) {% endif %} ), -curve_swaps AS ( +curve AS ( SELECT block_number, block_timestamp, @@ -93,66 +52,31 @@ curve_swaps AS ( origin_to_address, contract_address, event_name, - s.tokens_sold AS amount_in_unadj, - s.tokens_bought AS amount_out_unadj, + tokens_sold AS amount_in_unadj, + tokens_bought AS amount_out_unadj, + token_in, + token_out, sender, tx_to, event_index, platform, 'v1' AS version, - token_in, - token_out, - COALESCE( - c1.symbol, - s.symbol_in - ) AS token_symbol_in, - COALESCE( - c2.symbol, - s.symbol_out - ) AS token_symbol_out, - NULL AS pool_name, - c1.decimals AS decimals_in, - CASE - WHEN decimals_in IS NOT NULL THEN s.tokens_sold / pow( - 10, - decimals_in - ) - ELSE s.tokens_sold - END AS amount_in, - c2.decimals AS decimals_out, - CASE - WHEN decimals_out IS NOT NULL THEN s.tokens_bought / pow( - 10, - decimals_out - ) - ELSE s.tokens_bought - END AS amount_out, _log_id, _inserted_timestamp FROM {{ ref('silver_dex__curve_swaps') }} - s - LEFT JOIN contracts c1 - ON c1.address = s.token_in - LEFT JOIN contracts c2 - ON c2.address = s.token_out - WHERE - amount_out <> 0 - AND COALESCE( - token_symbol_in, - 'null' - ) <> COALESCE(token_symbol_out, 'null') -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' - FROM - {{ this }} -) +{% if is_incremental() and 'curve' not in var('HEAL_MODELS') %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) {% endif %} ), -balancer_swaps AS ( +balancer AS ( SELECT block_number, block_timestamp, @@ -161,50 +85,32 @@ balancer_swaps AS ( origin_from_address, origin_to_address, contract_address, - NULL AS pool_name, event_name, - c1.decimals AS decimals_in, - c1.symbol AS symbol_in, amount_in_unadj, amount_out_unadj, - CASE - WHEN decimals_in IS NULL THEN amount_in_unadj - ELSE (amount_in_unadj / pow(10, decimals_in)) - END AS amount_in, - c2.decimals AS decimals_out, - c2.symbol AS symbol_out, - CASE - WHEN decimals_out IS NULL THEN amount_out_unadj - ELSE (amount_out_unadj / pow(10, decimals_out)) - END AS amount_out, + token_in, + token_out, sender, tx_to, event_index, platform, 'v1' AS version, - token_in, - token_out, _log_id, _inserted_timestamp FROM {{ ref('silver_dex__balancer_swaps') }} - s - LEFT JOIN contracts c1 - ON s.token_in = c1.address - LEFT JOIN contracts c2 - ON s.token_out = c2.address -{% if is_incremental() %} +{% if is_incremental() and 'balancer' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) {% endif %} ), -honeyswap_swaps AS ( +honeyswap AS ( SELECT block_number, block_timestamp, @@ -214,49 +120,31 @@ honeyswap_swaps AS ( origin_to_address, contract_address, event_name, - c1.decimals AS decimals_in, - c1.symbol AS symbol_in, amount_in_unadj, - CASE - WHEN decimals_in IS NULL THEN amount_in_unadj - ELSE (amount_in_unadj / pow(10, decimals_in)) - END AS amount_in, - c2.decimals AS decimals_out, - c2.symbol AS symbol_out, amount_out_unadj, - CASE - WHEN decimals_out IS NULL THEN amount_out_unadj - ELSE (amount_out_unadj / pow(10, decimals_out)) - END AS amount_out, + token_in, + token_out, sender, tx_to, event_index, platform, 'v1' AS version, - token_in, - token_out, - NULL AS pool_name, _log_id, _inserted_timestamp FROM {{ ref('silver_dex__honeyswap_swaps') }} - s - LEFT JOIN contracts c1 - ON s.token_in = c1.address - LEFT JOIN contracts c2 - ON s.token_out = c2.address -{% if is_incremental() %} +{% if is_incremental() and 'honeyswap' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) {% endif %} ), -swapr_swaps AS ( +swapr AS ( SELECT block_number, block_timestamp, @@ -266,303 +154,501 @@ swapr_swaps AS ( origin_to_address, contract_address, event_name, - c1.decimals AS decimals_in, - c1.symbol AS symbol_in, amount_in_unadj, - CASE - WHEN decimals_in IS NULL THEN amount_in_unadj - ELSE (amount_in_unadj / pow(10, decimals_in)) - END AS amount_in, - c2.decimals AS decimals_out, - c2.symbol AS symbol_out, amount_out_unadj, - CASE - WHEN decimals_out IS NULL THEN amount_out_unadj - ELSE (amount_out_unadj / pow(10, decimals_out)) - END AS amount_out, + token_in, + token_out, sender, tx_to, event_index, platform, 'v1' AS version, - token_in, - token_out, - NULL AS pool_name, _log_id, _inserted_timestamp FROM {{ ref('silver_dex__swapr_swaps') }} - s - LEFT JOIN contracts c1 - ON s.token_in = c1.address - LEFT JOIN contracts c2 - ON s.token_out = c2.address -{% if is_incremental() %} +{% if is_incremental() and 'swapr' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) {% endif %} ), ---union all standard dex CTEs here (excludes amount_usd) -all_dex_standard AS ( +all_dex AS ( SELECT - block_number, - block_timestamp, - tx_hash, - origin_function_signature, - origin_from_address, - origin_to_address, - contract_address, - pool_name, - event_name, - amount_in_unadj, - amount_out_unadj, - amount_in, - amount_out, - sender, - tx_to, - event_index, - platform, - version, - token_in, - token_out, - symbol_in, - symbol_out, - decimals_in, - decimals_out, - _log_id, - _inserted_timestamp + * FROM - balancer_swaps + balancer UNION ALL SELECT - block_number, - block_timestamp, - tx_hash, - origin_function_signature, - origin_from_address, - origin_to_address, - contract_address, - pool_name, - event_name, - amount_in_unadj, - amount_out_unadj, - amount_in, - amount_out, - sender, - tx_to, - event_index, - platform, - version, - token_in, - token_out, - symbol_in, - symbol_out, - decimals_in, - decimals_out, - _log_id, - _inserted_timestamp + * FROM - sushi_swaps + sushi UNION ALL SELECT - block_number, - block_timestamp, - tx_hash, - origin_function_signature, - origin_from_address, - origin_to_address, - contract_address, - pool_name, - event_name, - amount_in_unadj, - amount_out_unadj, - amount_in, - amount_out, - sender, - tx_to, - event_index, - platform, - version, - token_in, - token_out, - token_symbol_in AS symbol_in, - token_symbol_out AS symbol_out, - decimals_in, - decimals_out, - _log_id, - _inserted_timestamp + * FROM - curve_swaps + curve UNION ALL SELECT - block_number, - block_timestamp, - tx_hash, - origin_function_signature, - origin_from_address, - origin_to_address, - contract_address, - pool_name, - event_name, - amount_in_unadj, - amount_out_unadj, - amount_in, - amount_out, - sender, - tx_to, - event_index, - platform, - version, - token_in, - token_out, - symbol_in, - symbol_out, - decimals_in, - decimals_out, - _log_id, - _inserted_timestamp + * FROM - honeyswap_swaps + honeyswap UNION ALL SELECT - block_number, - block_timestamp, - tx_hash, + * + FROM + swapr +), +complete_dex_swaps AS ( + SELECT + s.block_number, + s.block_timestamp, + s.tx_hash, origin_function_signature, origin_from_address, origin_to_address, - contract_address, - pool_name, + s.contract_address, event_name, + token_in, + c1.token_decimals AS decimals_in, + c1.token_symbol AS symbol_in, amount_in_unadj, + CASE + WHEN decimals_in IS NULL THEN amount_in_unadj + ELSE (amount_in_unadj / pow(10, decimals_in)) + END AS amount_in, + CASE + WHEN decimals_in IS NOT NULL THEN amount_in * p1.price + ELSE NULL + END AS amount_in_usd, + token_out, + c2.token_decimals AS decimals_out, + c2.token_symbol AS symbol_out, amount_out_unadj, - amount_in, - amount_out, + CASE + WHEN decimals_out IS NULL THEN amount_out_unadj + ELSE (amount_out_unadj / pow(10, decimals_out)) + END AS amount_out, + CASE + WHEN decimals_out IS NOT NULL THEN amount_out * p2.price + ELSE NULL + END AS amount_out_usd, + CASE + WHEN lp.pool_name IS NULL THEN CONCAT( + LEAST( + COALESCE( + symbol_in, + CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42)) + ), + COALESCE( + symbol_out, + CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42)) + ) + ), + '-', + GREATEST( + COALESCE( + symbol_in, + CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42)) + ), + COALESCE( + symbol_out, + CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42)) + ) + ) + ) + ELSE lp.pool_name + END AS pool_name, sender, tx_to, event_index, - platform, - version, - token_in, - token_out, - symbol_in, - symbol_out, - decimals_in, - decimals_out, - _log_id, - _inserted_timestamp + s.platform, + s.version, + s._log_id, + s._inserted_timestamp FROM - swapr_swaps + all_dex s + LEFT JOIN {{ ref('silver__contracts') }} + c1 + ON s.token_in = c1.contract_address + LEFT JOIN {{ ref('silver__contracts') }} + c2 + ON s.token_out = c2.contract_address + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p1 + ON s.token_in = p1.token_address + AND DATE_TRUNC( + 'hour', + block_timestamp + ) = p1.hour + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p2 + ON s.token_out = p2.token_address + AND DATE_TRUNC( + 'hour', + block_timestamp + ) = p2.hour + LEFT JOIN {{ ref('silver_dex__complete_dex_liquidity_pools') }} + lp + ON s.contract_address = lp.pool_address ), -FINAL AS ( + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +heal_model AS ( SELECT - block_number, - block_timestamp, - tx_hash, + t0.block_number, + t0.block_timestamp, + t0.tx_hash, origin_function_signature, origin_from_address, origin_to_address, - contract_address, - pool_name, + t0.contract_address, event_name, + token_in, + c1.token_decimals AS decimals_in, + c1.token_symbol AS symbol_in, amount_in_unadj, - amount_in, CASE - WHEN s.decimals_in IS NOT NULL THEN ROUND( - amount_in * p1.price, - 2 - ) + WHEN c1.token_decimals IS NULL THEN amount_in_unadj + ELSE (amount_in_unadj / pow(10, c1.token_decimals)) + END AS amount_in_heal, + CASE + WHEN c1.token_decimals IS NOT NULL THEN amount_in_heal * p1.price ELSE NULL - END AS amount_in_usd, + END AS amount_in_usd_heal, + token_out, + c2.token_decimals AS decimals_out, + c2.token_symbol AS symbol_out, amount_out_unadj, - amount_out, CASE - WHEN s.decimals_out IS NOT NULL THEN ROUND( - amount_out * p2.price, - 2 - ) + WHEN c2.token_decimals IS NULL THEN amount_out_unadj + ELSE (amount_out_unadj / pow(10, c2.token_decimals)) + END AS amount_out_heal, + CASE + WHEN c2.token_decimals IS NOT NULL THEN amount_out_heal * p2.price ELSE NULL - END AS amount_out_usd, + END AS amount_out_usd_heal, + CASE + WHEN lp.pool_name IS NULL THEN CONCAT( + LEAST( + COALESCE( + c1.token_symbol, + CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42)) + ), + COALESCE( + c2.token_symbol, + CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42)) + ) + ), + '-', + GREATEST( + COALESCE( + c1.token_symbol, + CONCAT(SUBSTRING(token_in, 1, 5), '...', SUBSTRING(token_in, 39, 42)) + ), + COALESCE( + c2.token_symbol, + CONCAT(SUBSTRING(token_out, 1, 5), '...', SUBSTRING(token_out, 39, 42)) + ) + ) + ) + ELSE lp.pool_name + END AS pool_name_heal, sender, tx_to, event_index, - platform, - version, - token_in, - token_out, - symbol_in, - symbol_out, - _log_id, - _inserted_timestamp + t0.platform, + t0.version, + t0._log_id, + t0._inserted_timestamp FROM - all_dex_standard s - LEFT JOIN prices p1 - ON s.token_in = p1.token_address + {{ this }} + t0 + LEFT JOIN {{ ref('silver__contracts') }} + c1 + ON t0.token_in = c1.contract_address + LEFT JOIN {{ ref('silver__contracts') }} + c2 + ON t0.token_out = c2.contract_address + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p1 + ON t0.token_in = p1.token_address AND DATE_TRUNC( 'hour', block_timestamp ) = p1.hour - LEFT JOIN prices p2 - ON s.token_out = p2.token_address + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p2 + ON t0.token_out = p2.token_address AND DATE_TRUNC( 'hour', block_timestamp ) = p2.hour + LEFT JOIN {{ ref('silver_dex__complete_dex_liquidity_pools') }} + lp + ON t0.contract_address = lp.pool_address + WHERE + CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t1.block_number, + '-', + t1.platform, + '-', + t1.version + ) + FROM + {{ this }} + t1 + WHERE + t1.decimals_in IS NULL + AND t1._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t1.token_in) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t2.block_number, + '-', + t2.platform, + '-', + t2.version + ) + FROM + {{ this }} + t2 + WHERE + t2.decimals_out IS NULL + AND t2._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__contracts') }} C + WHERE + C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND C.token_decimals IS NOT NULL + AND C.contract_address = t2.token_out) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t3.block_number, + '-', + t3.platform, + '-', + t3.version + ) + FROM + {{ this }} + t3 + WHERE + t3.amount_in_usd IS NULL + AND t3._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__complete_token_prices') }} + p + WHERE + p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND p.price IS NOT NULL + AND p.token_address = t3.token_in + AND p.hour = DATE_TRUNC( + 'hour', + t3.block_timestamp + ) + ) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform, + '-', + t0.version + ) IN ( + SELECT + CONCAT( + t4.block_number, + '-', + t4.platform, + '-', + t4.version + ) + FROM + {{ this }} + t4 + WHERE + t4.amount_out_usd IS NULL + AND t4._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__complete_token_prices') }} + p + WHERE + p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND p.price IS NOT NULL + AND p.token_address = t4.token_out + AND p.hour = DATE_TRUNC( + 'hour', + t4.block_timestamp + ) + ) + GROUP BY + 1 + ) + ), + {% endif %} + + FINAL AS ( + SELECT + * + FROM + complete_dex_swaps + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +UNION ALL +SELECT + block_number, + block_timestamp, + tx_hash, + origin_function_signature, + origin_from_address, + origin_to_address, + contract_address, + event_name, + token_in, + decimals_in, + symbol_in, + amount_in_unadj, + amount_in_heal AS amount_in, + amount_in_usd_heal AS amount_in_usd, + token_out, + decimals_out, + symbol_out, + amount_out_unadj, + amount_out_heal AS amount_out, + amount_out_usd_heal AS amount_out_usd, + pool_name_heal AS pool_name, + sender, + tx_to, + event_index, + platform, + version, + _log_id, + _inserted_timestamp +FROM + heal_model +{% endif %} ) SELECT - f.block_number, - f.block_timestamp, - f.tx_hash, + block_number, + block_timestamp, + tx_hash, origin_function_signature, origin_from_address, origin_to_address, - f.contract_address, - CASE - WHEN f.pool_name IS NULL THEN p.pool_name - ELSE f.pool_name - END AS pool_name, + contract_address, + pool_name, event_name, amount_in_unadj, amount_in, - CASE - WHEN amount_out_usd IS NULL - OR ABS((amount_in_usd - amount_out_usd) / NULLIF(amount_out_usd, 0)) > 0.75 - OR ABS((amount_in_usd - amount_out_usd) / NULLIF(amount_in_usd, 0)) > 0.75 THEN NULL - ELSE amount_in_usd - END AS amount_in_usd, + amount_in_usd, amount_out_unadj, amount_out, - CASE - WHEN amount_in_usd IS NULL - OR ABS((amount_out_usd - amount_in_usd) / NULLIF(amount_in_usd, 0)) > 0.75 - OR ABS((amount_out_usd - amount_in_usd) / NULLIF(amount_out_usd, 0)) > 0.75 THEN NULL - ELSE amount_out_usd - END AS amount_out_usd, + amount_out_usd, sender, tx_to, event_index, - f.platform, - f.version, + platform, + version, token_in, token_out, symbol_in, symbol_out, - f._log_id, - f._inserted_timestamp, + decimals_in, + decimals_out, + _log_id, + _inserted_timestamp, {{ dbt_utils.generate_surrogate_key( - ['f.tx_hash','f.event_index'] + ['tx_hash','event_index'] ) }} AS complete_dex_swaps_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id FROM - FINAL f - LEFT JOIN {{ ref('silver_dex__complete_dex_liquidity_pools') }} - p - ON f.contract_address = p.pool_address + FINAL qualify (ROW_NUMBER() over (PARTITION BY _log_id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/defi/dex/sushi/silver_dex__sushi_pools.sql b/models/silver/defi/dex/sushi/silver_dex__sushi_pools.sql index 9c7378d7..471435b7 100644 --- a/models/silver/defi/dex/sushi/silver_dex__sushi_pools.sql +++ b/models/silver/defi/dex/sushi/silver_dex__sushi_pools.sql @@ -27,6 +27,7 @@ WITH pool_creation AS ( WHERE contract_address = LOWER('0xc35DADB65012eC5796536bD9864eD8773aBc74C4') AND topics [0] :: STRING = '0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9' --PairCreated + AND tx_status = 'SUCCESS' {% if is_incremental() %} AND _inserted_timestamp >= ( diff --git a/models/silver/defi/dex/swapr/silver_dex__swapr_pools.sql b/models/silver/defi/dex/swapr/silver_dex__swapr_pools.sql index f4604d54..f06fa30f 100644 --- a/models/silver/defi/dex/swapr/silver_dex__swapr_pools.sql +++ b/models/silver/defi/dex/swapr/silver_dex__swapr_pools.sql @@ -27,6 +27,7 @@ WITH pool_creation AS ( WHERE contract_address = LOWER('0x5D48C95AdfFD4B40c1AAADc4e08fc44117E02179') AND topics [0] :: STRING = '0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9' --PairCreated + AND tx_status = 'SUCCESS' {% if is_incremental() %} AND _inserted_timestamp >= ( diff --git a/models/silver/defi/lending/complete_lending/silver__complete_lending_borrows.sql b/models/silver/defi/lending/complete_lending/silver__complete_lending_borrows.sql index 7dbcdc44..4dbc7f0d 100644 --- a/models/silver/defi/lending/complete_lending/silver__complete_lending_borrows.sql +++ b/models/silver/defi/lending/complete_lending/silver__complete_lending_borrows.sql @@ -1,9 +1,10 @@ +-- depends_on: {{ ref('silver__complete_token_prices') }} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', unique_key = ['block_number','platform'], cluster_by = ['block_timestamp::DATE'], - tags = ['reorg','curated'] + tags = ['reorg','curated','heal'] ) }} WITH aave AS ( @@ -30,13 +31,13 @@ WITH aave AS ( FROM {{ ref('silver__aave_borrows') }} A -{% if is_incremental() and 'aave' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'aave' not in var('HEAL_MODELS') %} WHERE A._inserted_timestamp >= ( SELECT MAX( _inserted_timestamp - ) - INTERVAL '36 hours' + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -65,13 +66,13 @@ agave AS ( FROM {{ ref('silver__agave_borrows') }} A -{% if is_incremental() and 'agave' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'agave' not in var('HEAL_MODELS') %} WHERE A._inserted_timestamp >= ( SELECT MAX( _inserted_timestamp - ) - INTERVAL '36 hours' + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -100,13 +101,13 @@ spark AS ( FROM {{ ref('silver__spark_borrows') }} A -{% if is_incremental() and 'spark' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'spark' not in var('HEAL_MODELS') %} WHERE A._inserted_timestamp >= ( SELECT MAX( _inserted_timestamp - ) - INTERVAL '36 hours' + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -135,13 +136,13 @@ realt AS ( FROM {{ ref('silver__realt_borrows') }} A -{% if is_incremental() and 'realt' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'realt' not in var('HEAL_MODELS') %} WHERE A._inserted_timestamp >= ( SELECT MAX( _inserted_timestamp - ) - INTERVAL '36 hours' + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -168,7 +169,7 @@ borrow_union AS ( FROM realt ), -FINAL AS ( +complete_lending_borrows AS ( SELECT tx_hash, block_number, @@ -178,10 +179,7 @@ FINAL AS ( origin_to_address, origin_function_signature, b.contract_address, - CASE - WHEN platform = 'Compound V3' THEN 'Withdraw' - ELSE 'Borrow' - END AS event_name, + 'Borrow' AS event_name, borrower, protocol_market, b.token_address, @@ -205,8 +203,126 @@ FINAL AS ( 'hour', block_timestamp ) = p.hour - LEFT JOIN {{ ref('silver__contracts') }} C - ON b.token_address = C.contract_address +), + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +heal_model AS ( + SELECT + tx_hash, + block_number, + block_timestamp, + event_index, + origin_from_address, + origin_to_address, + origin_function_signature, + t0.contract_address, + event_name, + borrower, + protocol_market, + t0.token_address, + t0.token_symbol, + amount_unadj, + amount, + ROUND( + amount * p.price, + 2 + ) AS amount_usd_heal, + platform, + t0.blockchain, + t0._LOG_ID, + t0._INSERTED_TIMESTAMP + FROM + {{ this }} + t0 + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p + ON t0.token_address = p.token_address + AND DATE_TRUNC( + 'hour', + block_timestamp + ) = p.hour + WHERE + CONCAT( + t0.block_number, + '-', + t0.platform + ) IN ( + SELECT + CONCAT( + t1.block_number, + '-', + t1.platform + ) + FROM + {{ this }} + t1 + WHERE + t1.amount_usd IS NULL + AND t1._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__complete_token_prices') }} + p + WHERE + p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND p.price IS NOT NULL + AND p.token_address = t1.token_address + AND p.hour = DATE_TRUNC( + 'hour', + t1.block_timestamp + ) + ) + GROUP BY + 1 + ) +), +{% endif %} + +FINAL AS ( + SELECT + * + FROM + complete_lending_borrows + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +UNION ALL +SELECT + tx_hash, + block_number, + block_timestamp, + event_index, + origin_from_address, + origin_to_address, + origin_function_signature, + contract_address, + event_name, + borrower, + protocol_market, + token_address, + token_symbol, + amount_unadj, + amount, + amount_usd_heal AS amount_usd, + platform, + blockchain, + _LOG_ID, + _INSERTED_TIMESTAMP +FROM + heal_model +{% endif %} ) SELECT *, diff --git a/models/silver/defi/lending/complete_lending/silver__complete_lending_deposits.sql b/models/silver/defi/lending/complete_lending/silver__complete_lending_deposits.sql index 60421216..4cb3bfae 100644 --- a/models/silver/defi/lending/complete_lending/silver__complete_lending_deposits.sql +++ b/models/silver/defi/lending/complete_lending/silver__complete_lending_deposits.sql @@ -1,9 +1,10 @@ +-- depends_on: {{ ref('silver__complete_token_prices') }} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', unique_key = ['block_number','platform'], cluster_by = ['block_timestamp::DATE'], - tags = ['reorg','curated'] + tags = ['reorg','curated','heal'] ) }} WITH aave AS ( @@ -30,11 +31,11 @@ WITH aave AS ( FROM {{ ref('silver__aave_deposits') }} -{% if is_incremental() and 'aave' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'aave' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -63,11 +64,11 @@ agave AS ( FROM {{ ref('silver__agave_deposits') }} -{% if is_incremental() and 'agave' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'agave' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -96,11 +97,11 @@ spark AS ( FROM {{ ref('silver__spark_deposits') }} -{% if is_incremental() and 'spark' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'spark' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -129,11 +130,11 @@ realt AS ( FROM {{ ref('silver__realt_deposits') }} -{% if is_incremental() and 'realt' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'realt' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -160,7 +161,7 @@ deposit_union AS ( FROM realt ), -FINAL AS ( +complete_lending_deposits AS ( SELECT tx_hash, block_number, @@ -171,7 +172,6 @@ FINAL AS ( origin_function_signature, A.contract_address, CASE - WHEN platform = 'Compound V3' THEN 'SupplyCollateral' WHEN platform = 'Aave V3' THEN 'Supply' ELSE 'Deposit' END AS event_name, @@ -198,8 +198,126 @@ FINAL AS ( 'hour', block_timestamp ) = p.hour - LEFT JOIN {{ ref('silver__contracts') }} C - ON A.token_address = C.contract_address +), + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +heal_model AS ( + SELECT + tx_hash, + block_number, + block_timestamp, + event_index, + origin_from_address, + origin_to_address, + origin_function_signature, + t0.contract_address, + event_name, + protocol_market, + depositor, + t0.token_address, + t0.token_symbol, + amount_unadj, + amount, + ROUND( + amount * p.price, + 2 + ) AS amount_usd_heal, + platform, + t0.blockchain, + t0._LOG_ID, + t0._INSERTED_TIMESTAMP + FROM + {{ this }} + t0 + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p + ON t0.token_address = p.token_address + AND DATE_TRUNC( + 'hour', + block_timestamp + ) = p.hour + WHERE + CONCAT( + t0.block_number, + '-', + t0.platform + ) IN ( + SELECT + CONCAT( + t1.block_number, + '-', + t1.platform + ) + FROM + {{ this }} + t1 + WHERE + t1.amount_usd IS NULL + AND t1._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__complete_token_prices') }} + p + WHERE + p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND p.price IS NOT NULL + AND p.token_address = t1.token_address + AND p.hour = DATE_TRUNC( + 'hour', + t1.block_timestamp + ) + ) + GROUP BY + 1 + ) +), +{% endif %} + +FINAL AS ( + SELECT + * + FROM + complete_lending_deposits + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +UNION ALL +SELECT + tx_hash, + block_number, + block_timestamp, + event_index, + origin_from_address, + origin_to_address, + origin_function_signature, + contract_address, + event_name, + protocol_market, + depositor, + token_address, + token_symbol, + amount_unadj, + amount, + amount_usd_heal AS amount_usd, + platform, + blockchain, + _LOG_ID, + _INSERTED_TIMESTAMP +FROM + heal_model +{% endif %} ) SELECT *, diff --git a/models/silver/defi/lending/complete_lending/silver__complete_lending_flashloans.sql b/models/silver/defi/lending/complete_lending/silver__complete_lending_flashloans.sql index 460b08c3..72a2bb7c 100644 --- a/models/silver/defi/lending/complete_lending/silver__complete_lending_flashloans.sql +++ b/models/silver/defi/lending/complete_lending/silver__complete_lending_flashloans.sql @@ -1,9 +1,10 @@ +-- depends_on: {{ ref('silver__complete_token_prices') }} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', unique_key = ['block_number','platform'], cluster_by = ['block_timestamp::DATE'], - tags = ['reorg','curated'] + tags = ['reorg','curated','heal'] ) }} WITH aave AS ( @@ -33,11 +34,11 @@ WITH aave AS ( FROM {{ ref('silver__aave_flashloans') }} -{% if is_incremental() and 'aave' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'aave' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -69,11 +70,11 @@ spark AS ( FROM {{ ref('silver__spark_flashloans') }} -{% if is_incremental() and 'spark' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'spark' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -105,11 +106,11 @@ agave AS ( FROM {{ ref('silver__agave_flashloans') }} -{% if is_incremental() and 'agave' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'agave' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -141,11 +142,11 @@ realt AS ( FROM {{ ref('silver__realt_flashloans') }} -{% if is_incremental() and 'realt' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'realt' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -172,7 +173,7 @@ flashloan_union AS ( FROM realt ), -FINAL AS ( +complete_lending_flashloans AS ( SELECT tx_hash, block_number, @@ -187,7 +188,7 @@ FINAL AS ( initiator_address AS initiator, target_address AS target, f.token_address AS flashloan_token, - token_symbol AS flashloan_token_symbol, + f.symbol AS flashloan_token_symbol, flashloan_amount_unadj, flashloan_amount, ROUND( @@ -213,8 +214,179 @@ FINAL AS ( 'hour', block_timestamp ) = p.hour - LEFT JOIN {{ ref('silver__contracts') }} C - ON f.token_address = C.contract_address +), + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +heal_model AS ( + SELECT + tx_hash, + block_number, + block_timestamp, + event_index, + origin_from_address, + origin_to_address, + origin_function_signature, + t0.contract_address, + event_name, + protocol_market, + initiator, + target, + flashloan_token, + flashloan_token_symbol, + flashloan_amount_unadj, + flashloan_amount, + ROUND( + flashloan_amount * p.price, + 2 + ) AS flashloan_amount_usd_heal, + premium_amount_unadj, + premium_amount, + ROUND( + premium_amount * p.price, + 2 + ) AS premium_amount_usd_heal, + platform, + t0.blockchain, + t0._LOG_ID, + t0._INSERTED_TIMESTAMP + FROM + {{ this }} + t0 + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p + ON t0.flashloan_token = p.token_address + AND DATE_TRUNC( + 'hour', + block_timestamp + ) = p.hour + WHERE + CONCAT( + t0.block_number, + '-', + t0.platform + ) IN ( + SELECT + CONCAT( + t1.block_number, + '-', + t1.platform + ) + FROM + {{ this }} + t1 + WHERE + t1.flashloan_amount_usd IS NULL + AND t1._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__complete_token_prices') }} + p + WHERE + p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND p.price IS NOT NULL + AND p.token_address = t1.flashloan_token + AND p.hour = DATE_TRUNC( + 'hour', + t1.block_timestamp + ) + ) + GROUP BY + 1 + ) + OR CONCAT( + t0.block_number, + '-', + t0.platform + ) IN ( + SELECT + CONCAT( + t2.block_number, + '-', + t2.platform + ) + FROM + {{ this }} + t2 + WHERE + t2.premium_amount_usd IS NULL + AND t2._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__complete_token_prices') }} + p + WHERE + p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND p.price IS NOT NULL + AND p.token_address = t2.flashloan_token + AND p.hour = DATE_TRUNC( + 'hour', + t2.block_timestamp + ) + ) + GROUP BY + 1 + ) +), +{% endif %} + +FINAL AS ( + SELECT + * + FROM + complete_lending_flashloans + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +UNION ALL +SELECT + tx_hash, + block_number, + block_timestamp, + event_index, + origin_from_address, + origin_to_address, + origin_function_signature, + contract_address, + event_name, + protocol_market, + initiator, + target, + flashloan_token, + flashloan_token_symbol, + flashloan_amount_unadj, + flashloan_amount, + flashloan_amount_usd_heal AS flashloan_amount_usd, + premium_amount_unadj, + premium_amount, + premium_amount_usd_heal AS premium_amount_usd, + platform, + blockchain, + _LOG_ID, + _INSERTED_TIMESTAMP +FROM + heal_model +{% endif %} ) SELECT *, diff --git a/models/silver/defi/lending/complete_lending/silver__complete_lending_liquidations.sql b/models/silver/defi/lending/complete_lending/silver__complete_lending_liquidations.sql index 4513364c..2bb61bef 100644 --- a/models/silver/defi/lending/complete_lending/silver__complete_lending_liquidations.sql +++ b/models/silver/defi/lending/complete_lending/silver__complete_lending_liquidations.sql @@ -1,9 +1,10 @@ +-- depends_on: {{ ref('silver__complete_token_prices') }} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', unique_key = ['block_number','platform'], cluster_by = ['block_timestamp::DATE'], - tags = ['reorg','curated'] + tags = ['reorg','curated','heal'] ) }} WITH aave AS ( @@ -34,11 +35,11 @@ WITH aave AS ( FROM {{ ref('silver__aave_liquidations') }} -{% if is_incremental() and 'aave' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'aave' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -71,11 +72,11 @@ spark AS ( FROM {{ ref('silver__spark_liquidations') }} -{% if is_incremental() and 'spark' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'spark' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -108,11 +109,11 @@ agave AS ( FROM {{ ref('silver__agave_liquidations') }} -{% if is_incremental() and 'agave' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'agave' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -145,11 +146,11 @@ realt AS ( FROM {{ ref('silver__realt_liquidations') }} -{% if is_incremental() and 'realt' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'realt' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -176,40 +177,7 @@ liquidation_union AS ( FROM realt ), -contracts AS ( - SELECT - * - FROM - {{ ref('silver__contracts') }} C - WHERE - C.contract_address IN ( - SELECT - DISTINCT(collateral_asset) AS asset - FROM - liquidation_union - ) -), -prices AS ( - SELECT - * - FROM - {{ ref('price__ez_prices_hourly') }} - p - WHERE - token_address IN ( - SELECT - DISTINCT(collateral_asset) AS asset - FROM - liquidation_union - ) - AND HOUR > ( - SELECT - MIN(block_timestamp) - FROM - liquidation_union - ) -), -FINAL AS ( +complete_lending_liquidations AS ( SELECT tx_hash, block_number, @@ -219,10 +187,7 @@ FINAL AS ( origin_to_address, origin_function_signature, A.contract_address, - CASE - WHEN platform = 'Compound V3' THEN 'AbsorbCollateral' - ELSE 'LiquidationCall' - END AS event_name, + 'LiquidationCall' AS event_name, liquidator, borrower, protocol_collateral_asset AS protocol_market, @@ -230,16 +195,10 @@ FINAL AS ( collateral_asset_symbol AS collateral_token_symbol, amount_unadj, liquidated_amount AS amount, - CASE - WHEN platform <> 'Compound V3' THEN ROUND( - liquidated_amount * p.price, - 2 - ) - ELSE ROUND( - liquidated_amount_usd, - 2 - ) - END AS amount_usd, + ROUND( + liquidated_amount * p.price, + 2 + ) AS amount_usd, debt_asset AS debt_token, debt_asset_symbol AS debt_token_symbol, platform, @@ -248,14 +207,139 @@ FINAL AS ( A._INSERTED_TIMESTAMP FROM liquidation_union A - LEFT JOIN prices p + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p ON collateral_asset = p.token_address AND DATE_TRUNC( 'hour', block_timestamp ) = p.hour - LEFT JOIN contracts C - ON collateral_asset = C.contract_address +), + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +heal_model AS ( + SELECT + tx_hash, + block_number, + block_timestamp, + event_index, + origin_from_address, + origin_to_address, + origin_function_signature, + t0.contract_address, + event_name, + liquidator, + borrower, + protocol_market, + collateral_token, + collateral_token_symbol, + amount_unadj, + amount, + ROUND( + amount * p.price, + 2 + ) AS amount_usd_heal, + debt_token, + debt_token_symbol, + platform, + t0.blockchain, + t0._LOG_ID, + t0._INSERTED_TIMESTAMP + FROM + {{ this }} + t0 + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p + ON t0.collateral_token = p.token_address + AND DATE_TRUNC( + 'hour', + block_timestamp + ) = p.hour + WHERE + CONCAT( + t0.block_number, + '-', + t0.platform + ) IN ( + SELECT + CONCAT( + t1.block_number, + '-', + t1.platform + ) + FROM + {{ this }} + t1 + WHERE + t1.amount_usd IS NULL + AND t1._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__complete_token_prices') }} + p + WHERE + p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND p.price IS NOT NULL + AND p.token_address = t1.collateral_token + AND p.hour = DATE_TRUNC( + 'hour', + t1.block_timestamp + ) + ) + GROUP BY + 1 + ) +), +{% endif %} + +FINAL AS ( + SELECT + * + FROM + complete_lending_liquidations + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +UNION ALL +SELECT + tx_hash, + block_number, + block_timestamp, + event_index, + origin_from_address, + origin_to_address, + origin_function_signature, + contract_address, + event_name, + liquidator, + borrower, + protocol_market, + collateral_token, + collateral_token_symbol, + amount_unadj, + amount, + amount_usd_heal AS amount_usd, + debt_token, + debt_token_symbol, + platform, + blockchain, + _LOG_ID, + _INSERTED_TIMESTAMP +FROM + heal_model +{% endif %} ) SELECT *, diff --git a/models/silver/defi/lending/complete_lending/silver__complete_lending_repayments.sql b/models/silver/defi/lending/complete_lending/silver__complete_lending_repayments.sql index 04dd391c..5e3d9562 100644 --- a/models/silver/defi/lending/complete_lending/silver__complete_lending_repayments.sql +++ b/models/silver/defi/lending/complete_lending/silver__complete_lending_repayments.sql @@ -1,9 +1,10 @@ +-- depends_on: {{ ref('silver__complete_token_prices') }} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', unique_key = ['block_number','platform'], cluster_by = ['block_timestamp::DATE'], - tags = ['reorg','curated'] + tags = ['reorg','curated','heal'] ) }} WITH aave AS ( @@ -31,11 +32,11 @@ WITH aave AS ( FROM {{ ref('silver__aave_repayments') }} -{% if is_incremental() and 'aave' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'aave' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -65,11 +66,11 @@ spark AS ( FROM {{ ref('silver__spark_repayments') }} -{% if is_incremental() and 'spark' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'spark' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -99,11 +100,11 @@ agave AS ( FROM {{ ref('silver__agave_repayments') }} -{% if is_incremental() and 'agave' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'agave' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -133,11 +134,11 @@ realt AS ( FROM {{ ref('silver__realt_repayments') }} -{% if is_incremental() and 'realt' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'realt' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT - MAX(_inserted_timestamp) - INTERVAL '36 hours' + MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -164,7 +165,7 @@ repayments_union AS ( FROM realt ), -FINAL AS ( +complete_lending_repayments AS ( SELECT tx_hash, block_number, @@ -174,10 +175,7 @@ FINAL AS ( origin_to_address, origin_function_signature, A.contract_address, - CASE - WHEN platform = 'Compound V3' THEN 'Supply' - ELSE 'Repay' - END AS event_name, + 'Repay' AS event_name, protocol_market, payer_address AS payer, borrower, @@ -202,8 +200,128 @@ FINAL AS ( 'hour', block_timestamp ) = p.hour - LEFT JOIN {{ ref('silver__contracts') }} C - ON A.token_address = C.contract_address +), + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +heal_model AS ( + SELECT + tx_hash, + block_number, + block_timestamp, + event_index, + origin_from_address, + origin_to_address, + origin_function_signature, + t0.contract_address, + event_name, + protocol_market, + payer, + borrower, + t0.token_address, + t0.token_symbol, + amount_unadj, + amount, + ROUND( + amount * p.price, + 2 + ) AS amount_usd_heal, + platform, + t0.blockchain, + t0._LOG_ID, + t0._INSERTED_TIMESTAMP + FROM + {{ this }} + t0 + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p + ON t0.token_address = p.token_address + AND DATE_TRUNC( + 'hour', + block_timestamp + ) = p.hour + WHERE + CONCAT( + t0.block_number, + '-', + t0.platform + ) IN ( + SELECT + CONCAT( + t1.block_number, + '-', + t1.platform + ) + FROM + {{ this }} + t1 + WHERE + t1.amount_usd IS NULL + AND t1._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__complete_token_prices') }} + p + WHERE + p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND p.price IS NOT NULL + AND p.token_address = t1.token_address + AND p.hour = DATE_TRUNC( + 'hour', + t1.block_timestamp + ) + ) + GROUP BY + 1 + ) +), +{% endif %} + +FINAL AS ( + SELECT + * + FROM + complete_lending_repayments + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +UNION ALL +SELECT + tx_hash, + block_number, + block_timestamp, + event_index, + origin_from_address, + origin_to_address, + origin_function_signature, + contract_address, + event_name, + protocol_market, + payer, + borrower, + token_address, + token_symbol, + amount_unadj, + amount, + amount_usd_heal AS amount_usd, + platform, + blockchain, + _LOG_ID, + _INSERTED_TIMESTAMP +FROM + heal_model +{% endif %} ) SELECT *, diff --git a/models/silver/defi/lending/complete_lending/silver__complete_lending_withdraws.sql b/models/silver/defi/lending/complete_lending/silver__complete_lending_withdraws.sql index 0d86078e..2bf418d2 100644 --- a/models/silver/defi/lending/complete_lending/silver__complete_lending_withdraws.sql +++ b/models/silver/defi/lending/complete_lending/silver__complete_lending_withdraws.sql @@ -1,9 +1,10 @@ +-- depends_on: {{ ref('silver__complete_token_prices') }} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', unique_key = ['block_number','platform'], cluster_by = ['block_timestamp::DATE'], - tags = ['reorg','curated'] + tags = ['reorg','curated','heal'] ) }} WITH aave AS ( @@ -30,13 +31,13 @@ WITH aave AS ( FROM {{ ref('silver__aave_withdraws') }} -{% if is_incremental() and 'aave' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'aave' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT MAX( _inserted_timestamp - ) - INTERVAL '36 hours' + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -65,13 +66,13 @@ spark AS ( FROM {{ ref('silver__spark_withdraws') }} -{% if is_incremental() and 'spark' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'spark' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT MAX( _inserted_timestamp - ) - INTERVAL '36 hours' + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -100,13 +101,13 @@ agave AS ( FROM {{ ref('silver__agave_withdraws') }} -{% if is_incremental() and 'agave' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'agave' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT MAX( _inserted_timestamp - ) - INTERVAL '36 hours' + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -135,13 +136,13 @@ realt AS ( FROM {{ ref('silver__realt_withdraws') }} -{% if is_incremental() and 'realt' not in var('HEAL_CURATED_MODEL') %} +{% if is_incremental() and 'realt' not in var('HEAL_MODELS') %} WHERE _inserted_timestamp >= ( SELECT MAX( _inserted_timestamp - ) - INTERVAL '36 hours' + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' FROM {{ this }} ) @@ -168,7 +169,7 @@ withdraws_union AS ( FROM realt ), -FINAL AS ( +complete_lending_withdraws AS ( SELECT tx_hash, block_number, @@ -178,10 +179,7 @@ FINAL AS ( origin_to_address, origin_function_signature, A.contract_address, - CASE - WHEN platform = 'Compound V3' THEN 'WithdrawCollateral' - ELSE 'Withdraw' - END AS event_name, + 'Withdraw' AS event_name, protocol_market, depositor_address AS depositor, A.token_address, @@ -205,8 +203,126 @@ FINAL AS ( 'hour', block_timestamp ) = p.hour - LEFT JOIN {{ ref('silver__contracts') }} C - ON A.token_address = C.contract_address +), + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +heal_model AS ( + SELECT + tx_hash, + block_number, + block_timestamp, + event_index, + origin_from_address, + origin_to_address, + origin_function_signature, + t0.contract_address, + event_name, + protocol_market, + depositor, + t0.token_address, + t0.token_symbol, + amount_unadj, + amount, + ROUND( + amount * p.price, + 2 + ) AS amount_usd_heal, + platform, + t0.blockchain, + t0._log_id, + t0._inserted_timestamp + FROM + {{ this }} + t0 + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + p + ON t0.token_address = p.token_address + AND DATE_TRUNC( + 'hour', + block_timestamp + ) = p.hour + WHERE + CONCAT( + t0.block_number, + '-', + t0.platform + ) IN ( + SELECT + CONCAT( + t1.block_number, + '-', + t1.platform + ) + FROM + {{ this }} + t1 + WHERE + t1.amount_usd IS NULL + AND t1._inserted_timestamp < ( + SELECT + MAX( + _inserted_timestamp + ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}' + FROM + {{ this }} + ) + AND EXISTS ( + SELECT + 1 + FROM + {{ ref('silver__complete_token_prices') }} + p + WHERE + p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE()) + AND p.price IS NOT NULL + AND p.token_address = t1.token_address + AND p.hour = DATE_TRUNC( + 'hour', + t1.block_timestamp + ) + ) + GROUP BY + 1 + ) +), +{% endif %} + +FINAL AS ( + SELECT + * + FROM + complete_lending_withdraws + +{% if is_incremental() and var( + 'HEAL_MODEL' +) %} +UNION ALL +SELECT + tx_hash, + block_number, + block_timestamp, + event_index, + origin_from_address, + origin_to_address, + origin_function_signature, + contract_address, + event_name, + protocol_market, + depositor, + token_address, + token_symbol, + amount_unadj, + amount, + amount_usd_heal AS amount_usd, + platform, + blockchain, + _log_id, + _inserted_timestamp +FROM + heal_model +{% endif %} ) SELECT *,