Skip to content

Commit

Permalink
raydium cpmm swaps (#676)
Browse files Browse the repository at this point in the history
* raydium cpmm swaps

* remove filter
  • Loading branch information
tarikceric authored Oct 17, 2024
1 parent 7021e4c commit 0f8c825
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 0 deletions.
3 changes: 3 additions & 0 deletions data/testing__swaps_intermediate_raydium_cpmm.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
tx_id,swapper,swap_from_amount,swap_from_mint,swap_to_amount,swap_to_mint,swap_index
3o7EsrU2JaxUGcdDzLEUXAptsaq9M2xeub6RJMcXZ1zfEmsNs4QzLGFD64j3L3KpMgq7y43VrxwL2ZxRZd1tx5S5,5JGSn76EcwF1dkiF213iptW5AaKQwYnnY6WKaTbv7eyE,0.001343562,So11111111111111111111111111111111111111112,6501189.665,HECZgs7unJtJ2626a1FV4BnmVr5BphuZWobVavdmpYnY,1
5c8f2zz7WVULKouc4U9jmikkmnY1C9eFMBRvAb3wD7JB1uckLdNGmWF2tv8ZJhzEUKHiAmkzQR25YhqMbT6fy4NT,8ZnvqGzD8AeJ158k4bMYNkbELn35x6uWoZZogGHArpeQ,1.0031e-05,So11111111111111111111111111111111111111112,0.001231,3Ae4HCpJqYdDSY4eHsSZyYhwp1zy2oerk5CFpV26eaMx,1
22 changes: 22 additions & 0 deletions models/gold/defi/defi__fact_swaps.sql
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,28 @@ FROM
WHERE
modified_timestamp >= '{{ max_modified_timestamp }}'
{% endif %}
UNION ALL
SELECT
block_timestamp,
block_id,
tx_id,
succeeded,
swapper,
swap_from_amount,
swap_from_mint,
swap_to_amount,
swap_to_mint,
program_id,
swap_index,
swaps_intermediate_raydium_cpmm_id as fact_swaps_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__swaps_intermediate_raydium_cpmm') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= '{{ max_modified_timestamp }}'
{% endif %}
)

select
Expand Down
1 change: 1 addition & 0 deletions models/gold/defi/defi__fact_swaps.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ models:
- 'silver__swaps_intermediate_raydium_stable'
- 'silver__swaps_intermediate_raydium_v4_amm'
- 'silver__swaps_pumpfun'
- 'silver__swaps_intermediate_raydium_cpmm'
id_column: 'tx_id'
columns:
- name: BLOCK_TIMESTAMP
Expand Down
10 changes: 10 additions & 0 deletions models/silver/parser/silver__decoded_instructions_combined.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,16 @@ models:
AND succeeded
and _inserted_timestamp between current_date - 7 and current_timestamp() - INTERVAL '4 HOUR'
to_condition: "_inserted_timestamp >= current_date - 7"
- dbt_utils.relationships_where:
name: dbt_utils_relationships_where_silver__decoded_instructions_combined_swaps_intermediate_raydium_cpmm_tx_id
to: ref('silver__swaps_intermediate_raydium_cpmm')
field: tx_id
from_condition: >
program_id = 'CPMMoo8L3F4NbTegBCKVNunggL7H1ZpdTHKxQB5qKP1C'
AND event_type IN ('swapBaseInput','swapBaseOutput')
AND succeeded
and _inserted_timestamp between current_date - 7 and current_timestamp() - INTERVAL '4 HOUR'
to_condition: "_inserted_timestamp >= current_date - 7"
- name: SIGNERS
description: "{{ doc('signers') }}"
- name: SUCCEEDED
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
-- depends_on: {{ ref('silver__decoded_instructions_combined') }}

{{ config(
materialized = 'incremental',
unique_key = ['swaps_intermediate_raydium_cpmm_id'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE'],
tags = ['scheduled_non_core']
) }}

{% if execute %}
{% set base_query %}
CREATE OR REPLACE TEMPORARY TABLE silver.swaps_intermediate_raydium_cpmm__intermediate_tmp AS
SELECT
block_timestamp,
block_id,
tx_id,
succeeded,
INDEX,
inner_index,
program_id,
event_type,
decoded_instruction,
_inserted_timestamp
FROM
{{ ref('silver__decoded_instructions_combined') }}
WHERE
program_id = 'CPMMoo8L3F4NbTegBCKVNunggL7H1ZpdTHKxQB5qKP1C'
AND event_type in ('swapBaseInput','swapBaseOutput')

{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '1 hour'
FROM
{{ this }}
)
{% else %}
AND _inserted_timestamp::DATE >= '2024-10-11'
{% endif %}
{% endset %}

{% do run_query(base_query) %}
{% set between_stmts = fsc_utils.dynamic_range_predicate(
"silver.swaps_intermediate_raydium_cpmm__intermediate_tmp",
"block_timestamp::date"
) %}
{% endif %}

WITH base AS (
SELECT
*
FROM
silver.swaps_intermediate_raydium_cpmm__intermediate_tmp
),
decoded AS (
SELECT
block_timestamp,
block_id,
tx_id,
INDEX,
inner_index,
COALESCE(LEAD(inner_index) OVER (PARTITION BY tx_id, index
ORDER BY inner_index) -1, 999999
) AS inner_index_end,
program_id,
silver.udf_get_account_pubkey_by_name('payer', decoded_instruction:accounts) as swapper,
silver.udf_get_account_pubkey_by_name('inputTokenAccount', decoded_instruction:accounts) as source_token_account,
null as source_mint,
null as destination_mint,
silver.udf_get_account_pubkey_by_name('outputTokenAccount', decoded_instruction:accounts) as destination_token_account,
silver.udf_get_account_pubkey_by_name('outputVault', decoded_instruction:accounts) as program_destination_token_account,
silver.udf_get_account_pubkey_by_name('inputVault', decoded_instruction:accounts) as program_source_token_account,
_inserted_timestamp
FROM
base
),
transfers AS (
SELECT
A.*,
COALESCE(SPLIT_PART(INDEX :: text, '.', 1) :: INT, INDEX :: INT) AS index_1,
NULLIF(SPLIT_PART(INDEX :: text, '.', 2), '') :: INT AS inner_index_1
FROM
{{ ref('silver__transfers') }} A
INNER JOIN (
SELECT
DISTINCT tx_id,
block_timestamp::DATE AS block_date
FROM
decoded
) d
ON d.block_date = A.block_timestamp::DATE
AND d.tx_id = A.tx_id
WHERE
A.succeeded
AND {{ between_stmts }}
),
pre_final AS (
SELECT
A.block_id,
A.block_timestamp,
A.program_id,
A.tx_id,
A.index,
A.inner_index,
A.inner_index_end,
C.succeeded,
A.swapper,
b.amount AS swap_from_amount,
b.mint AS swap_from_mint,
C.amount AS swap_to_amount,
C.mint AS swap_to_mint,
A._inserted_timestamp
FROM
decoded A
LEFT JOIN transfers b
ON A.tx_id = b.tx_id
AND A.source_token_account = b.source_token_account
AND A.program_source_token_account = b.dest_token_account
AND A.index = b.index_1
AND (
(b.inner_index_1 BETWEEN A.inner_index AND A.inner_index_end)
OR A.inner_index IS NULL
)
LEFT JOIN transfers C
ON A.tx_id = C.tx_id
AND A.destination_token_account = C.dest_token_account
AND A.program_destination_token_account = C.source_token_account
AND A.index = C.index_1
AND (
(C.inner_index_1 BETWEEN A.inner_index AND A.inner_index_end)
OR A.inner_index IS NULL
)
QUALIFY ROW_NUMBER() over (PARTITION BY A.tx_id, A.index, A.inner_INDEX ORDER BY inner_index) = 1
)
SELECT
block_id,
block_timestamp,
program_id,
tx_id,
succeeded,
ROW_NUMBER() over (
PARTITION BY tx_id
ORDER BY
INDEX,
inner_index
) AS swap_index,
index,
inner_index,
swapper,
swap_from_amount,
swap_from_mint,
swap_to_amount,
swap_to_mint,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['tx_id','swap_index','program_id']) }} AS swaps_intermediate_raydium_cpmm_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
pre_final
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
version: 2
models:
- name: silver__swaps_intermediate_raydium_cpmm
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_ID
- SWAP_INDEX
- PROGRAM_ID
where: block_timestamp::date > current_date - 30
- compare_model_subset:
name: silver__swaps_intermediate_raydium_cpmm_business_logic_test
compare_model: ref('testing__swaps_intermediate_raydium_cpmm')
compare_columns:
- tx_id
- swapper
- round(swap_from_amount,8)
- swap_from_mint
- round(swap_to_amount,8)
- swap_to_mint
- swap_index
model_condition: "where tx_id in ('3o7EsrU2JaxUGcdDzLEUXAptsaq9M2xeub6RJMcXZ1zfEmsNs4QzLGFD64j3L3KpMgq7y43VrxwL2ZxRZd1tx5S5',
'5c8f2zz7WVULKouc4U9jmikkmnY1C9eFMBRvAb3wD7JB1uckLdNGmWF2tv8ZJhzEUKHiAmkzQR25YhqMbT6fy4NT'
)"
recent_date_filter: &recent_date_filter
config:
where: _inserted_timestamp >= current_date - 7
columns:
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- not_null: *recent_date_filter
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null: *recent_date_filter
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null: *recent_date_filter
- name: SUCCEEDED
description: "{{ doc('tx_succeeded') }}"
tests:
- not_null: *recent_date_filter
- name: INDEX
description: "{{ doc('index') }}"
tests:
- not_null: *recent_date_filter
- name: INNER_INDEX
description: "{{ doc('inner_index') }}"
- name: PROGRAM_ID
description: "{{ doc('program_id') }}"
tests:
- not_null: *recent_date_filter
- name: SWAPPER
description: "{{ doc('swaps_swapper') }}"
tests:
- not_null:
where: succeeded = TRUE
- name: SWAP_FROM_AMOUNT
description: "{{ doc('swaps_from_amt') }}"
tests:
- not_null: *recent_date_filter
- name: SWAP_FROM_MINT
description: "{{ doc('swaps_from_mint') }}"
tests:
- not_null: *recent_date_filter
- name: SWAP_TO_AMOUNT
description: "{{ doc('swaps_to_amt') }}"
tests:
- not_null: *recent_date_filter
- name: SWAP_TO_MINT
description: "{{ doc('swaps_to_mint') }}"
tests:
- not_null: *recent_date_filter
- name: SWAP_INDEX
description: "{{ doc('swaps_swap_index') }}"
tests:
- not_null: *recent_date_filter
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- name: SWAPS_INTERMEDIATE_RAYDIUM_CPMM_ID
description: '{{ doc("pk") }}'
tests:
- unique: *recent_date_filter
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
tests:
- not_null: *recent_date_filter
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
tests:
- not_null: *recent_date_filter
- name: _INVOCATION_ID
description: '{{ doc("_invocation_id") }}'
tests:
- not_null:
name: test_silver__not_null_swaps_intermediate_raydium_cpmm__invocation_id
<<: *recent_date_filter

0 comments on commit 0f8c825

Please sign in to comment.