Skip to content

Commit

Permalink
AN-5568/sl-2 (#235)
Browse files Browse the repository at this point in the history
* initial core streamline model updates for sl2

* dbt project vars, core bronze views and fsc-evm temp macros

* revert

* spacing

* external function uri stg

* sources and references to new table names

* workflow cmds

* chainhead test

* vars clean up

* update api integration

* added prod integration, cleaned up vars, param for silver.traces2

* remove goerli models

* block delay

* var for delay

* remove order by
  • Loading branch information
drethereum authored Dec 10, 2024
1 parent 4814d90 commit ce3d34c
Show file tree
Hide file tree
Showing 96 changed files with 2,753 additions and 2,507 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/dbt_run_streamline_chainhead.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "base_models,tag:streamline_core_complete" "base_models,tag:streamline_core_realtime"
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "base_models,tag:streamline_core_complete" "base_models,tag:streamline_core_realtime" "base_models,tag:streamline_core_complete_receipts" "base_models,tag:streamline_core_realtime_receipts" "base_models,tag:streamline_core_complete_confirm_blocks" "base_models,tag:streamline_core_realtime_confirm_blocks"
- name: Run Chainhead Tests
run: |
dbt test -m "base_models,tag:chainhead"
2 changes: 1 addition & 1 deletion .github/workflows/dbt_run_streamline_history_adhoc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ on:
description: 'DBT Run Command'
required: true
options:
- dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "base_models,tag:streamline_core_complete" "base_models,tag:streamline_core_history"
- dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "base_models,tag:streamline_core_complete" "base_models,tag:streamline_core_history" "base_models,tag:streamline_core_complete_receipts" "base_models,tag:streamline_core_history_receipts" "base_models,tag:streamline_core_complete_confirm_blocks" "base_models,tag:streamline_core_history_confirm_blocks"
- dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120}' -m "base_models,tag:streamline_decoded_logs_complete" "base_models,tag:streamline_decoded_logs_history"

env:
Expand Down
54 changes: 53 additions & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,56 @@ vars:
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: |
["INTERNAL_DEV"]
["INTERNAL_DEV"]
#### STREAMLINE 2.0 BEGIN ####

API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: |
["INTERNAL_DEV"]
config:
# The keys correspond to dbt profiles and are case sensitive
dev:
API_INTEGRATION: AWS_BASE_API_STG_V2
EXTERNAL_FUNCTION_URI: p2bt501b4d.execute-api.us-east-1.amazonaws.com/stg/
ROLES:
- AWS_LAMBDA_BASE_API
- INTERNAL_DEV

prod:
API_INTEGRATION: AWS_BASE_API_PROD_V2
EXTERNAL_FUNCTION_URI: 6zxz2oxkwk.execute-api.us-east-1.amazonaws.com/prod/
ROLES:
- AWS_LAMBDA_BASE_API
- INTERNAL_DEV
- DBT_CLOUD_BASE

#### STREAMLINE 2.0 END ####

#### FSC_EVM BEGIN ####
# Visit https://github.com/FlipsideCrypto/fsc-evm/wiki for more information on required and optional variables

### GLOBAL VARIABLES BEGIN ###
## REQUIRED
GLOBAL_PROD_DB_NAME: 'base'
GLOBAL_NODE_SECRET_PATH: 'Vault/prod/base/quicknode/base_mainnet'
GLOBAL_BLOCKS_PER_HOUR: 1800
GLOBAL_USES_STREAMLINE_V1: True
GLOBAL_USES_SINGLE_FLIGHT_METHOD: True

### GLOBAL VARIABLES END ###

### MAIN_PACKAGE VARIABLES BEGIN ###

### CORE ###
## REQUIRED

## OPTIONAL
# GOLD_FULL_REFRESH: True
# SILVER_FULL_REFRESH: True

### MAIN_PACKAGE VARIABLES END ###

#### FSC_EVM END ####
226 changes: 226 additions & 0 deletions macros/fsc_evm_temp/_legacy/silver_traces.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
{% macro silver_traces_v1(
full_reload_start_block,
full_reload_blocks,
full_reload_mode = false,
TRACES_ARB_MODE = false,
TRACES_SEI_MODE = false,
TRACES_KAIA_MODE = false,
use_partition_key = false,
schema_name = 'bronze'
) %}
WITH bronze_traces AS (
SELECT
block_number,
{% if use_partition_key %}
partition_key,
{% else %}
_partition_by_block_id AS partition_key,
{% endif %}

VALUE :array_index :: INT AS tx_position,
DATA :result AS full_traces,
{% if TRACES_SEI_MODE %}
DATA :txHash :: STRING AS tx_hash,
{% endif %}
_inserted_timestamp
FROM

{% if is_incremental() and not full_reload_mode %}
{{ ref(
schema_name ~ '__traces'
) }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
AND DATA :result IS NOT NULL {% if TRACES_ARB_MODE %}
AND block_number > 22207817
{% endif %}

{% elif is_incremental() and full_reload_mode %}
{{ ref(
schema_name ~ '__traces_fr'
) }}
WHERE
{% if use_partition_key %}
partition_key BETWEEN (
SELECT
MAX(partition_key) - 100000
FROM
{{ this }}
)
AND (
SELECT
MAX(partition_key) + {{ full_reload_blocks }}
FROM
{{ this }}
)
{% else %}
_partition_by_block_id BETWEEN (
SELECT
MAX(_partition_by_block_id) - 100000
FROM
{{ this }}
)
AND (
SELECT
MAX(_partition_by_block_id) + {{ full_reload_blocks }}
FROM
{{ this }}
)
{% endif %}

{% if TRACES_ARB_MODE %}
AND block_number > 22207817
{% endif %}
{% else %}
{{ ref(
schema_name ~ '__traces_fr'
) }}
WHERE
{% if use_partition_key %}
partition_key <= {{ full_reload_start_block }}
{% else %}
_partition_by_block_id <= {{ full_reload_start_block }}
{% endif %}

{% if TRACES_ARB_MODE %}
AND block_number > 22207817
{% endif %}
{% endif %}

qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_position
ORDER BY
_inserted_timestamp DESC)) = 1
),
flatten_traces AS (
SELECT
block_number,
{% if TRACES_SEI_MODE %}
tx_hash,
{% else %}
tx_position,
{% endif %}
partition_key,
IFF(
path IN (
'result',
'result.value',
'result.type',
'result.to',
'result.input',
'result.gasUsed',
'result.gas',
'result.from',
'result.output',
'result.error',
'result.revertReason',
'result.time',
'gasUsed',
'gas',
'type',
'to',
'from',
'value',
'input',
'error',
'output',
'time',
'revertReason'
{% if TRACES_ARB_MODE %},
'afterEVMTransfers',
'beforeEVMTransfers',
'result.afterEVMTransfers',
'result.beforeEVMTransfers'
{% endif %}
{% if TRACES_KAIA_MODE %},
'reverted',
'result.reverted'
{% endif %}
),
'ORIGIN',
REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '')
) AS trace_address,
_inserted_timestamp,
OBJECT_AGG(
key,
VALUE
) AS trace_json,
CASE
WHEN trace_address = 'ORIGIN' THEN NULL
WHEN POSITION(
'_' IN trace_address
) = 0 THEN 'ORIGIN'
ELSE REGEXP_REPLACE(
trace_address,
'_[0-9]+$',
'',
1,
1
)
END AS parent_trace_address,
SPLIT(
trace_address,
'_'
) AS trace_address_array
FROM
bronze_traces txs,
TABLE(
FLATTEN(
input => PARSE_JSON(
txs.full_traces
),
recursive => TRUE
)
) f
WHERE
f.index IS NULL
AND f.key != 'calls'
AND f.path != 'result'
{% if TRACES_ARB_MODE %}
AND f.path NOT LIKE 'afterEVMTransfers[%'
AND f.path NOT LIKE 'beforeEVMTransfers[%'
{% endif %}
{% if TRACES_KAIA_MODE %}
and f.key not in ('message', 'contract')
{% endif %}
GROUP BY
block_number,
{% if TRACES_SEI_MODE %}
tx_hash,
{% else %}
tx_position,
{% endif %}
partition_key,
trace_address,
_inserted_timestamp
)
SELECT
block_number,
{% if TRACES_SEI_MODE %}
tx_hash,
{% else %}
tx_position,
{% endif %}
trace_address,
parent_trace_address,
trace_address_array,
trace_json,
partition_key,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_number'] +
(['tx_hash'] if TRACES_SEI_MODE else ['tx_position']) +
['trace_address']
) }} AS traces_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
flatten_traces qualify(ROW_NUMBER() over(PARTITION BY traces_id
ORDER BY
_inserted_timestamp DESC)) = 1
{% endmacro %}
Loading

0 comments on commit ce3d34c

Please sign in to comment.