Skip to content

Commit

Permalink
latency (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
austinFlipside authored Nov 10, 2023
1 parent da8351f commit 027551d
Show file tree
Hide file tree
Showing 15 changed files with 868 additions and 633 deletions.
44 changes: 44 additions & 0 deletions .github/workflows/dbt_run_scheduled_curated.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: dbt_run_scheduled_curated
run-name: dbt_run_scheduled_curated

on:
workflow_dispatch:
branches:
- "main"

env:
DBT_PROFILES_DIR: ./

ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"

concurrency:
group: ${{ github.workflow }}

jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod

steps:
- uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"

- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "gnosis_models,tag:curated"
3 changes: 2 additions & 1 deletion data/github_actions__workflows.csv
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ workflow_name,workflow_schedule
dbt_run_scheduled_non_realtime,"10,40 * * * *"
dbt_run_streamline_chainhead,"0,30 * * * *"
dbt_run_streamline_decoder,"20,50 * * * *"
dbt_test_tasks,"25,55 * * * *"
dbt_run_scheduled_curated,"30 * * * *"
dbt_test_tasks,"25 * * * *"
282 changes: 139 additions & 143 deletions models/silver/dex/balancer/silver_dex__balancer_pools.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
full_refresh = false,
tags = ['non_realtime']
tags = ['curated']
) }}

WITH pools_registered AS (
Expand All @@ -15,15 +15,23 @@ WITH pools_registered AS (
tx_hash,
contract_address,
topics [1] :: STRING AS pool_id,
SUBSTR(topics [1] :: STRING,1,42) AS pool_address,
SUBSTR(
topics [1] :: STRING,
1,
42
) AS pool_address,
_log_id,
_inserted_timestamp,
ROW_NUMBER() OVER (ORDER BY pool_address) AS row_num
ROW_NUMBER() over (
ORDER BY
pool_address
) AS row_num
FROM
{{ ref('silver__logs') }}
WHERE
topics[0]::STRING = '0x3c13bc30b8e878c53fd2a36b679409c073afd75950be43d8858768e956fbc20e' --PoolRegistered
topics [0] :: STRING = '0x3c13bc30b8e878c53fd2a36b679409c073afd75950be43d8858768e956fbc20e' --PoolRegistered
AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8'

{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
Expand All @@ -32,120 +40,91 @@ AND _inserted_timestamp >= (
{{ this }}
)
{% endif %}

),

tokens_registered AS (

SELECT
block_number,
block_timestamp,
event_index,
tx_hash,
contract_address,
decoded_flat :poolId :: STRING AS pool_id,
decoded_flat :tokens AS tokens,
tokens[0] :: STRING AS token0,
tokens[1] :: STRING AS token1,
tokens[2] :: STRING AS token2,
tokens[3] :: STRING AS token3,
tokens[4] :: STRING AS token4,
tokens[5] :: STRING AS token5,
tokens[6] :: STRING AS token6,
tokens[7] :: STRING AS token7,
decoded_flat :assetManagers AS asset_managers,
_log_id,
_inserted_timestamp
FROM {{ ref('silver__decoded_logs') }}
WHERE
topics[0]::STRING = '0xf5847d3f2197b16cdcd2098ec95d0905cd1abdaf415f07bb7cef2bba8ac5dec4' --TokensRegistered
AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8'
AND tx_hash IN (
SELECT
tx_hash
FROM
pools_registered
)
SELECT
block_number,
block_timestamp,
event_index,
tx_hash,
contract_address,
decoded_flat :poolId :: STRING AS pool_id,
decoded_flat :tokens AS tokens,
tokens [0] :: STRING AS token0,
tokens [1] :: STRING AS token1,
tokens [2] :: STRING AS token2,
tokens [3] :: STRING AS token3,
tokens [4] :: STRING AS token4,
tokens [5] :: STRING AS token5,
tokens [6] :: STRING AS token6,
tokens [7] :: STRING AS token7,
decoded_flat :assetManagers AS asset_managers,
_log_id,
_inserted_timestamp
FROM
{{ ref('silver__decoded_logs') }}
WHERE
topics [0] :: STRING = '0xf5847d3f2197b16cdcd2098ec95d0905cd1abdaf415f07bb7cef2bba8ac5dec4' --TokensRegistered
AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8'
AND tx_hash IN (
SELECT
tx_hash
FROM
pools_registered
)
),

function_sigs AS (

SELECT
'0x06fdde03' AS function_sig,
'name' AS function_name
UNION ALL
SELECT
'0x95d89b41' AS function_sig,
'symbol' AS function_name
UNION ALL
SELECT
'0x313ce567' AS function_sig,
'decimals' AS function_name
SELECT
'0x06fdde03' AS function_sig,
'name' AS function_name
UNION ALL
SELECT
'0x95d89b41' AS function_sig,
'symbol' AS function_name
UNION ALL
SELECT
'0x313ce567' AS function_sig,
'decimals' AS function_name
),

inputs_pools AS (

SELECT
pool_address,
block_number,
function_sig,
(ROW_NUMBER() OVER (PARTITION BY pool_address
ORDER BY block_number)) - 1 AS function_input
FROM pools_registered
JOIN function_sigs ON 1=1
),

pool_token_reads AS (

{% for item in range(50) %}
(
SELECT
ethereum.streamline.udf_json_rpc_read_calls(
node_url,
headers,
PARSE_JSON(batch_read)
) AS read_output,
SYSDATE() AS _inserted_timestamp
FROM (
SELECT
CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read
FROM (
SELECT
pool_address,
block_number,
function_sig,
function_input,
CONCAT(
'[\'',
pool_address,
'\',',
block_number,
',\'',
function_sig,
'\',\'',
function_input,
'\']'
) AS read_input,
row_num
FROM inputs_pools
LEFT JOIN pools_registered USING(pool_address)
) ready_reads_pools
WHERE row_num BETWEEN {{ item * 50 + 1 }} AND {{ (item + 1) * 50}}
) batch_reads_pools
JOIN {{ source(
'streamline_crosschain',
'node_mapping'
) }} ON 1=1
AND chain = 'gnosis'
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}
pool_address,
block_number,
function_sig,
(ROW_NUMBER() over (PARTITION BY pool_address
ORDER BY
block_number)) - 1 AS function_input
FROM
pools_registered
JOIN function_sigs
ON 1 = 1
),

reads_adjusted AS (

pool_token_reads AS ({% for item in range(50) %}
(
SELECT
ethereum.streamline.udf_json_rpc_read_calls(node_url, headers, PARSE_JSON(batch_read)) AS read_output, SYSDATE() AS _inserted_timestamp
FROM
(
SELECT
CONCAT('[', LISTAGG(read_input, ','), ']') AS batch_read
FROM
(
SELECT
pool_address, block_number, function_sig, function_input, CONCAT('[\'', pool_address, '\',', block_number, ',\'', function_sig, '\',\'', function_input, '\']') AS read_input, row_num
FROM
inputs_pools
LEFT JOIN pools_registered USING(pool_address)) ready_reads_pools
WHERE
row_num BETWEEN {{ item * 50 + 1 }}
AND {{(item + 1) * 50 }}) batch_reads_pools
JOIN {{ source('streamline_crosschain', 'node_mapping') }}
ON 1 = 1
AND chain = 'gnosis') {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}),
reads_adjusted AS (
SELECT
VALUE :id :: STRING AS read_id,
VALUE :result :: STRING AS read_result,
SPLIT(
Expand All @@ -157,40 +136,53 @@ SELECT
read_id_object [2] :: STRING AS function_sig,
read_id_object [3] :: STRING AS function_input,
_inserted_timestamp
FROM
pool_token_reads,
LATERAL FLATTEN(
input => read_output [0] :data
)
FROM
pool_token_reads,
LATERAL FLATTEN(
input => read_output [0] :data
)
),

pool_details AS (

SELECT
pool_address,
function_sig,
function_name,
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output,
_inserted_timestamp
FROM reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
),

SELECT
pool_address,
function_sig,
function_name,
read_result,
regexp_substr_all(SUBSTR(read_result, 3, len(read_result)), '.{64}') AS segmented_output,
_inserted_timestamp
FROM
reads_adjusted
LEFT JOIN function_sigs USING(function_sig)
),
FINAL AS (
SELECT
pool_address,
MIN(CASE WHEN function_name = 'symbol' THEN utils.udf_hex_to_string(segmented_output [2] :: STRING) END) AS pool_symbol,
MIN(CASE WHEN function_name = 'name' THEN utils.udf_hex_to_string(segmented_output [2] :: STRING) END) AS pool_name,
MIN(CASE
WHEN read_result::STRING = '0x' THEN NULL
ELSE utils.udf_hex_to_int(LEFT(read_result::STRING,66))
END)::INTEGER AS pool_decimals,
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM pool_details
GROUP BY 1
SELECT
pool_address,
MIN(
CASE
WHEN function_name = 'symbol' THEN utils.udf_hex_to_string(
segmented_output [2] :: STRING
)
END
) AS pool_symbol,
MIN(
CASE
WHEN function_name = 'name' THEN utils.udf_hex_to_string(
segmented_output [2] :: STRING
)
END
) AS pool_name,
MIN(
CASE
WHEN read_result :: STRING = '0x' THEN NULL
ELSE utils.udf_hex_to_int(LEFT(read_result :: STRING, 66))
END
) :: INTEGER AS pool_decimals,
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
pool_details
GROUP BY
1
)

SELECT
p.block_number,
p.block_timestamp,
Expand All @@ -213,7 +205,11 @@ SELECT
t.asset_managers,
p._log_id,
f._inserted_timestamp
FROM FINAL f
LEFT JOIN pools_registered p ON f.pool_address = p.pool_address
LEFT JOIN tokens_registered t ON p.pool_id = t.pool_id
WHERE t.token0 IS NOT NULL
FROM
FINAL f
LEFT JOIN pools_registered p
ON f.pool_address = p.pool_address
LEFT JOIN tokens_registered t
ON p.pool_id = t.pool_id
WHERE
t.token0 IS NOT NULL
2 changes: 1 addition & 1 deletion models/silver/dex/balancer/silver_dex__balancer_swaps.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime','reorg']
tags = ['curated','reorg']
) }}

WITH pools AS (
Expand Down
Loading

0 comments on commit 027551d

Please sign in to comment.