Skip to content

Commit

Permalink
Merge Main
Browse files Browse the repository at this point in the history
  • Loading branch information
forgxyz committed Nov 18, 2024
2 parents 76628b0 + 4df472b commit e43a36d
Show file tree
Hide file tree
Showing 28 changed files with 667 additions and 11 deletions.
19 changes: 15 additions & 4 deletions .github/workflows/dbt_run_scheduled_reward_points_realtime.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: dbt_run_scheduled_reward_points_api
run-name: dbt_run_scheduled_reward_points_api
name: dbt_run_scheduled_reward_points_realtime
run-name: dbt_run_scheduled_reward_points_realtime

on:
workflow_dispatch:
Expand All @@ -19,6 +19,9 @@ env:
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
FLOW_POINTS_URL: "${{ secrets.FLOW_POINTS_URL }}"
PRIVATE_KEY: "${{ secrets.PRIVATE_KEY }}"
PUBLIC_ADDRESS: "${{ secrets.PUBLIC_ADDRESS }}"

concurrency:
group: ${{ github.workflow }}
Expand All @@ -40,9 +43,17 @@ jobs:
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
- name: Request User Points Balances
run: >
dbt run -s tag:streamline_non_core --vars '{"STREAMLINE_INVOKE_STREAMS": True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": False}'
dbt run -s streamline__reward_points_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'
- name: Authenticate with Flow Points API
run: |
python python/points/authenticate.py
- name: Request Points Transfers
run: >
dbt run -s streamline__points_transfers_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'
- name: Store logs
uses: actions/upload-artifact@v3
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/dbt_run_scheduled_reward_points_silver.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ jobs:
- name: Run DBT Jobs
run: >
dbt run -s silver_api__reward_points
dbt run -s tag:streamline_non_core
- name: Test DBT Models
run: >
dbt test -s silver_api__reward_points
dbt test -s tag:streamline_non_core
continue-on-error: true

- name: Log test results
Expand Down
2 changes: 1 addition & 1 deletion models/descriptions/address.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% docs address %}

The on-chain address. See more in the Flow docs on accounts and addresses: https://developers.flow.com/build/basics/accounts
The on-chain address. See more in the Flow docs on accounts and addresses: https://developers.flow.com/build/basics/accounts and https://developers.flow.com/evm/accounts

{% enddocs %}
30 changes: 30 additions & 0 deletions models/gold/core/core__dim_address_mapping.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{{ config (
materialized = 'incremental',
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
unique_key = 'dim_address_mapping_id',
cluster_by = ['block_timestamp_associated::date'],
post_hook = 'ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(evm_address, flow_address);',
tags = ['scheduled_non_core']
) }}

SELECT
block_timestamp AS block_timestamp_associated,
block_height AS block_height_associated,
flow_address,
evm_address,
flow_evm_address_map_id AS dim_address_mapping_id,
SYSDATE() AS modified_timestamp,
SYSDATE() AS inserted_timestamp
FROM
{{ ref('silver__flow_evm_address_map') }}

{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
46 changes: 46 additions & 0 deletions models/gold/core/core__dim_address_mapping.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
version: 2

models:
- name: core__dim_address_mapping
description: -|
A table that maps EVM addresses to Flow addresses based on COA Creation events. A single Flow address may have multiple EVM addresses associated with it.
tests:
- dbt_utils.recency:
datepart: hours
field: BLOCK_TIMESTAMP_ASSOCIATED
interval: 24

columns:
- name: BLOCK_HEIGHT_ASSOCIATED
description: "{{ doc('block_number') }}"
tests:
- not_null

- name: BLOCK_TIMESTAMP_ASSOCIATED
description: "{{ doc('block_timestamp') }}"
tests:
- not_null

- name: EVM_ADDRESS
description: "{{ doc('address') }}"
tests:
- not_null
- unique

- name: FLOW_ADDRESS
description: "{{ doc('address') }}"
tests:
- not_null

- name: DIM_ADDRESS_MAPPING_ID
description: "{{ doc('pk_id') }}"
tests:
- not_null
- unique

- name: INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"

- name: MODIFIED_TIMESTAMP
description: "{{ doc('modified_timestamp') }}"

123 changes: 123 additions & 0 deletions models/silver/curated/silver__flow_evm_address_map.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
{{ config(
materialized = 'incremental',
unique_key = 'flow_evm_address_map_id',
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::date', 'modified_timestamp::date'],
tags = ['scheduled_non_core']
) }}

WITH events AS (

SELECT
*
FROM
{{ ref('silver__streamline_events') }}
WHERE
block_timestamp :: DATE >= '2024-09-02'

{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
coa_creation AS (
SELECT
tx_id,
block_timestamp,
block_height,
event_index,
CONCAT(
'0x',
event_data :address :: STRING
) AS evm_address
FROM
events
WHERE
event_contract = 'A.e467b9dd11fa00df.EVM'
AND event_type = 'CadenceOwnedAccountCreated'
),
txs AS (
SELECT
tx_id,
block_height,
authorizers
FROM
{{ ref('silver__streamline_transactions_final') }}
WHERE
block_timestamp :: DATE >= '2024-09-02'
AND tx_id IN (
SELECT
tx_id
FROM
events
)

{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
get_flow_address AS (
SELECT
tx_id,
block_height,
event_index,
event_type,
event_data :address :: STRING AS flow_address
FROM
events
WHERE
event_contract = 'flow'
AND event_type = 'CapabilityPublished'
AND tx_id IN (
SELECT
tx_id
FROM
coa_creation
) -- a transaction may emit multiple CapabilityPublished events
qualify(ROW_NUMBER() over (PARTITION BY tx_id
ORDER BY
event_index) = 1)
),
map_addresses AS (
SELECT
A.tx_id,
A.block_timestamp,
A.block_height,
A.evm_address,
COALESCE(
b.flow_address,
C.authorizers [0] :: STRING
) AS flow_address,
b.flow_address IS NULL AS used_authorizer
FROM
coa_creation A
LEFT JOIN get_flow_address b
ON A.tx_id = b.tx_id
AND A.block_height = b.block_height
LEFT JOIN txs C
ON A.tx_id = C.tx_id
AND A.block_height = C.block_height
)
SELECT
tx_id,
block_timestamp,
block_height,
evm_address,
flow_address,
used_authorizer,
{{ dbt_utils.generate_surrogate_key(['evm_address', 'flow_address']) }} AS flow_evm_address_map_id,
SYSDATE() AS modified_timestamp,
SYSDATE() AS inserted_timestamp,
'{{ invocation_id }}' AS invocation_id
FROM
map_addresses
53 changes: 53 additions & 0 deletions models/silver/curated/silver__flow_evm_address_map.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
version: 2

models:
- name: silver__flow_evm_address_map
description: -|
A table that maps EVM addresses to Flow addresses based on COA Creation events.
tests:
- dbt_utils.recency:
datepart: hours
field: block_timestamp
interval: 24

columns:
- name: BLOCK_HEIGHT
description: "{{ doc('block_number') }}"
tests:
- not_null

- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null

- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null

- name: EVM_ADDRESS
description: "{{ doc('address') }}"
tests:
- not_null
- unique

- name: FLOW_ADDRESS
description: "{{ doc('address') }}"
tests:
- not_null

- name: FLOW_EVM_ADDRESS_MAP_ID
description: "{{ doc('pk_id') }}"
tests:
- not_null
- unique

- name: INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"

- name: MODIFIED_TIMESTAMP
description: "{{ doc('modified_timestamp') }}"

- name: _INVOCATION_ID
description: "{{ doc('invocation_id') }}"
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
unique_key = "reward_points_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['_inserted_timestamp :: DATE', 'address']
cluster_by = ['_inserted_timestamp :: DATE', 'address'],
tags = ['streamline_non_core']
) }}

SELECT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
target = "{{this.schema}}.{{this.identifier}}",
params = {
"external_table": "reward_points",
"sql_limit": "1000",
"sql_limit": "10000",
"producer_batch_size": "1000",
"worker_batch_size": "1000",
"sql_source": "{{this.identifier}}"
}
),
tags = ['streamline_non_core']
)
) }}

WITH evm_addresses AS (
Expand Down
24 changes: 24 additions & 0 deletions models/silver/streamline/external/streamline__evm_addresses.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: 2

models:
- name: streamline__evm_addresses
description: "Table of unique EVM addresses."
data_tests:
- dbt_utils.recency:
datepart: day
field: modified_timestamp
interval: 1

columns:
- name: address
tests:
- not_null
- unique

- name: modified_timestamp

- name: inserted_timestamp

- name: evm_addresses_id

- name: _invocation_id
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['streamline_non_core']
) }}

{{ streamline_external_table_FR_query_v2(
model = "points_transfers",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['streamline_non_core']
) }}

{{ streamline_external_table_query_v2(
model = "points_transfers",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}
Loading

0 comments on commit e43a36d

Please sign in to comment.