diff --git a/data/testing__swaps_intermediate_raydium_cpmm.csv b/data/testing__swaps_intermediate_raydium_cpmm.csv new file mode 100644 index 000000000..245dd88df --- /dev/null +++ b/data/testing__swaps_intermediate_raydium_cpmm.csv @@ -0,0 +1,3 @@ +tx_id,swapper,swap_from_amount,swap_from_mint,swap_to_amount,swap_to_mint,swap_index +3o7EsrU2JaxUGcdDzLEUXAptsaq9M2xeub6RJMcXZ1zfEmsNs4QzLGFD64j3L3KpMgq7y43VrxwL2ZxRZd1tx5S5,5JGSn76EcwF1dkiF213iptW5AaKQwYnnY6WKaTbv7eyE,0.001343562,So11111111111111111111111111111111111111112,6501189.665,HECZgs7unJtJ2626a1FV4BnmVr5BphuZWobVavdmpYnY,1 +5c8f2zz7WVULKouc4U9jmikkmnY1C9eFMBRvAb3wD7JB1uckLdNGmWF2tv8ZJhzEUKHiAmkzQR25YhqMbT6fy4NT,8ZnvqGzD8AeJ158k4bMYNkbELn35x6uWoZZogGHArpeQ,1.0031e-05,So11111111111111111111111111111111111111112,0.001231,3Ae4HCpJqYdDSY4eHsSZyYhwp1zy2oerk5CFpV26eaMx,1 \ No newline at end of file diff --git a/models/gold/defi/defi__fact_swaps.sql b/models/gold/defi/defi__fact_swaps.sql index e9f87ceb5..5f72dd9f4 100644 --- a/models/gold/defi/defi__fact_swaps.sql +++ b/models/gold/defi/defi__fact_swaps.sql @@ -289,6 +289,28 @@ FROM WHERE modified_timestamp >= '{{ max_modified_timestamp }}' {% endif %} +UNION ALL +SELECT + block_timestamp, + block_id, + tx_id, + succeeded, + swapper, + swap_from_amount, + swap_from_mint, + swap_to_amount, + swap_to_mint, + program_id, + swap_index, + swaps_intermediate_raydium_cpmm_id as fact_swaps_id, + inserted_timestamp, + modified_timestamp +FROM + {{ ref('silver__swaps_intermediate_raydium_cpmm') }} +{% if is_incremental() %} +WHERE + modified_timestamp >= '{{ max_modified_timestamp }}' +{% endif %} ) select diff --git a/models/gold/defi/defi__fact_swaps.yml b/models/gold/defi/defi__fact_swaps.yml index ec39c63a5..14110e4c9 100644 --- a/models/gold/defi/defi__fact_swaps.yml +++ b/models/gold/defi/defi__fact_swaps.yml @@ -20,6 +20,7 @@ models: - 'silver__swaps_intermediate_raydium_stable' - 'silver__swaps_intermediate_raydium_v4_amm' - 'silver__swaps_pumpfun' + - 'silver__swaps_intermediate_raydium_cpmm' id_column: 'tx_id' columns: - name: BLOCK_TIMESTAMP diff --git a/models/silver/parser/silver__decoded_instructions_combined.yml b/models/silver/parser/silver__decoded_instructions_combined.yml index ae2ef57f6..3dc18ead2 100644 --- a/models/silver/parser/silver__decoded_instructions_combined.yml +++ b/models/silver/parser/silver__decoded_instructions_combined.yml @@ -176,6 +176,16 @@ models: AND succeeded and _inserted_timestamp between current_date - 7 and current_timestamp() - INTERVAL '4 HOUR' to_condition: "_inserted_timestamp >= current_date - 7" + - dbt_utils.relationships_where: + name: dbt_utils_relationships_where_silver__decoded_instructions_combined_swaps_intermediate_raydium_cpmm_tx_id + to: ref('silver__swaps_intermediate_raydium_cpmm') + field: tx_id + from_condition: > + program_id = 'CPMMoo8L3F4NbTegBCKVNunggL7H1ZpdTHKxQB5qKP1C' + AND event_type IN ('swapBaseInput','swapBaseOutput') + AND succeeded + and _inserted_timestamp between current_date - 7 and current_timestamp() - INTERVAL '4 HOUR' + to_condition: "_inserted_timestamp >= current_date - 7" - name: SIGNERS description: "{{ doc('signers') }}" - name: SUCCEEDED diff --git a/models/silver/swaps/raydium/silver__swaps_intermediate_raydium_cpmm.sql b/models/silver/swaps/raydium/silver__swaps_intermediate_raydium_cpmm.sql new file mode 100644 index 000000000..534aeced2 --- /dev/null +++ b/models/silver/swaps/raydium/silver__swaps_intermediate_raydium_cpmm.sql @@ -0,0 +1,162 @@ +-- depends_on: {{ ref('silver__decoded_instructions_combined') }} + +{{ config( + materialized = 'incremental', + unique_key = ['swaps_intermediate_raydium_cpmm_id'], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE'], + tags = ['scheduled_non_core'] +) }} + +{% if execute %} + {% set base_query %} + CREATE OR REPLACE TEMPORARY TABLE silver.swaps_intermediate_raydium_cpmm__intermediate_tmp AS + SELECT + block_timestamp, + block_id, + tx_id, + succeeded, + INDEX, + inner_index, + program_id, + event_type, + decoded_instruction, + _inserted_timestamp + FROM + {{ ref('silver__decoded_instructions_combined') }} + WHERE + program_id = 'CPMMoo8L3F4NbTegBCKVNunggL7H1ZpdTHKxQB5qKP1C' + AND event_type in ('swapBaseInput','swapBaseOutput') + + {% if is_incremental() %} + AND _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) - INTERVAL '1 hour' + FROM + {{ this }} + ) + {% else %} + AND _inserted_timestamp::DATE >= '2024-10-11' + {% endif %} + {% endset %} + + {% do run_query(base_query) %} + {% set between_stmts = fsc_utils.dynamic_range_predicate( + "silver.swaps_intermediate_raydium_cpmm__intermediate_tmp", + "block_timestamp::date" + ) %} +{% endif %} + +WITH base AS ( + SELECT + * + FROM + silver.swaps_intermediate_raydium_cpmm__intermediate_tmp +), +decoded AS ( + SELECT + block_timestamp, + block_id, + tx_id, + INDEX, + inner_index, + COALESCE(LEAD(inner_index) OVER (PARTITION BY tx_id, index + ORDER BY inner_index) -1, 999999 + ) AS inner_index_end, + program_id, + silver.udf_get_account_pubkey_by_name('payer', decoded_instruction:accounts) as swapper, + silver.udf_get_account_pubkey_by_name('inputTokenAccount', decoded_instruction:accounts) as source_token_account, + null as source_mint, + null as destination_mint, + silver.udf_get_account_pubkey_by_name('outputTokenAccount', decoded_instruction:accounts) as destination_token_account, + silver.udf_get_account_pubkey_by_name('outputVault', decoded_instruction:accounts) as program_destination_token_account, + silver.udf_get_account_pubkey_by_name('inputVault', decoded_instruction:accounts) as program_source_token_account, + _inserted_timestamp + FROM + base +), +transfers AS ( + SELECT + A.*, + COALESCE(SPLIT_PART(INDEX :: text, '.', 1) :: INT, INDEX :: INT) AS index_1, + NULLIF(SPLIT_PART(INDEX :: text, '.', 2), '') :: INT AS inner_index_1 + FROM + {{ ref('silver__transfers') }} A + INNER JOIN ( + SELECT + DISTINCT tx_id, + block_timestamp::DATE AS block_date + FROM + decoded + ) d + ON d.block_date = A.block_timestamp::DATE + AND d.tx_id = A.tx_id + WHERE + A.succeeded + AND {{ between_stmts }} +), +pre_final AS ( + SELECT + A.block_id, + A.block_timestamp, + A.program_id, + A.tx_id, + A.index, + A.inner_index, + A.inner_index_end, + C.succeeded, + A.swapper, + b.amount AS swap_from_amount, + b.mint AS swap_from_mint, + C.amount AS swap_to_amount, + C.mint AS swap_to_mint, + A._inserted_timestamp + FROM + decoded A + LEFT JOIN transfers b + ON A.tx_id = b.tx_id + AND A.source_token_account = b.source_token_account + AND A.program_source_token_account = b.dest_token_account + AND A.index = b.index_1 + AND ( + (b.inner_index_1 BETWEEN A.inner_index AND A.inner_index_end) + OR A.inner_index IS NULL + ) + LEFT JOIN transfers C + ON A.tx_id = C.tx_id + AND A.destination_token_account = C.dest_token_account + AND A.program_destination_token_account = C.source_token_account + AND A.index = C.index_1 + AND ( + (C.inner_index_1 BETWEEN A.inner_index AND A.inner_index_end) + OR A.inner_index IS NULL + ) + QUALIFY ROW_NUMBER() over (PARTITION BY A.tx_id, A.index, A.inner_INDEX ORDER BY inner_index) = 1 +) +SELECT + block_id, + block_timestamp, + program_id, + tx_id, + succeeded, + ROW_NUMBER() over ( + PARTITION BY tx_id + ORDER BY + INDEX, + inner_index + ) AS swap_index, + index, + inner_index, + swapper, + swap_from_amount, + swap_from_mint, + swap_to_amount, + swap_to_mint, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key(['tx_id','swap_index','program_id']) }} AS swaps_intermediate_raydium_cpmm_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + pre_final diff --git a/models/silver/swaps/raydium/silver__swaps_intermediate_raydium_cpmm.yml b/models/silver/swaps/raydium/silver__swaps_intermediate_raydium_cpmm.yml new file mode 100644 index 000000000..23fcb9f45 --- /dev/null +++ b/models/silver/swaps/raydium/silver__swaps_intermediate_raydium_cpmm.yml @@ -0,0 +1,104 @@ +version: 2 +models: + - name: silver__swaps_intermediate_raydium_cpmm + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - TX_ID + - SWAP_INDEX + - PROGRAM_ID + where: block_timestamp::date > current_date - 30 + - compare_model_subset: + name: silver__swaps_intermediate_raydium_cpmm_business_logic_test + compare_model: ref('testing__swaps_intermediate_raydium_cpmm') + compare_columns: + - tx_id + - swapper + - round(swap_from_amount,8) + - swap_from_mint + - round(swap_to_amount,8) + - swap_to_mint + - swap_index + model_condition: "where tx_id in ('3o7EsrU2JaxUGcdDzLEUXAptsaq9M2xeub6RJMcXZ1zfEmsNs4QzLGFD64j3L3KpMgq7y43VrxwL2ZxRZd1tx5S5', + '5c8f2zz7WVULKouc4U9jmikkmnY1C9eFMBRvAb3wD7JB1uckLdNGmWF2tv8ZJhzEUKHiAmkzQR25YhqMbT6fy4NT' + )" + recent_date_filter: &recent_date_filter + config: + where: _inserted_timestamp >= current_date - 7 + columns: + - name: BLOCK_TIMESTAMP + description: "{{ doc('block_timestamp') }}" + tests: + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 2 + - not_null: *recent_date_filter + - name: BLOCK_ID + description: "{{ doc('block_id') }}" + tests: + - not_null: *recent_date_filter + - name: TX_ID + description: "{{ doc('tx_id') }}" + tests: + - not_null: *recent_date_filter + - name: SUCCEEDED + description: "{{ doc('tx_succeeded') }}" + tests: + - not_null: *recent_date_filter + - name: INDEX + description: "{{ doc('index') }}" + tests: + - not_null: *recent_date_filter + - name: INNER_INDEX + description: "{{ doc('inner_index') }}" + - name: PROGRAM_ID + description: "{{ doc('program_id') }}" + tests: + - not_null: *recent_date_filter + - name: SWAPPER + description: "{{ doc('swaps_swapper') }}" + tests: + - not_null: + where: succeeded = TRUE + - name: SWAP_FROM_AMOUNT + description: "{{ doc('swaps_from_amt') }}" + tests: + - not_null: *recent_date_filter + - name: SWAP_FROM_MINT + description: "{{ doc('swaps_from_mint') }}" + tests: + - not_null: *recent_date_filter + - name: SWAP_TO_AMOUNT + description: "{{ doc('swaps_to_amt') }}" + tests: + - not_null: *recent_date_filter + - name: SWAP_TO_MINT + description: "{{ doc('swaps_to_mint') }}" + tests: + - not_null: *recent_date_filter + - name: SWAP_INDEX + description: "{{ doc('swaps_swap_index') }}" + tests: + - not_null: *recent_date_filter + - name: _INSERTED_TIMESTAMP + description: "{{ doc('_inserted_timestamp') }}" + tests: + - not_null + - name: SWAPS_INTERMEDIATE_RAYDIUM_CPMM_ID + description: '{{ doc("pk") }}' + tests: + - unique: *recent_date_filter + - name: INSERTED_TIMESTAMP + description: '{{ doc("inserted_timestamp") }}' + tests: + - not_null: *recent_date_filter + - name: MODIFIED_TIMESTAMP + description: '{{ doc("modified_timestamp") }}' + tests: + - not_null: *recent_date_filter + - name: _INVOCATION_ID + description: '{{ doc("_invocation_id") }}' + tests: + - not_null: + name: test_silver__not_null_swaps_intermediate_raydium_cpmm__invocation_id + <<: *recent_date_filter \ No newline at end of file