Skip to content

Commit

Permalink
AN-5459/point transfers api integration (#378)
Browse files Browse the repository at this point in the history
* point transfers api integration

* upd 2 silver models

* gha workflow with auth

* upd gha workflow

* add gha on push config for testing

* add print step

* chg echo

* upd auth.py

* rm run on commit from gha

* upd model tags

* merge 2 silver into 1, add yml, upd gha

* upd auth return vals

* add exit 1 on failure

* upd return on success True

* add gha on-run config for final test

* add backup default to env_var(JWT)

* add backup default to env_var(JWT) - 2

* rm run on commit from gha and upd sql limit for balance call

* add yml for evm address model

* add evm address threshold test

* upd gha workflow

* upd per CRs
  • Loading branch information
forgxyz authored Nov 15, 2024
1 parent 7edd606 commit 4df472b
Show file tree
Hide file tree
Showing 17 changed files with 414 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: dbt_run_scheduled_reward_points_api
run-name: dbt_run_scheduled_reward_points_api
name: dbt_run_scheduled_reward_points_realtime
run-name: dbt_run_scheduled_reward_points_realtime

on:
workflow_dispatch:
Expand All @@ -19,6 +19,9 @@ env:
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
FLOW_POINTS_URL: "${{ secrets.FLOW_POINTS_URL }}"
PRIVATE_KEY: "${{ secrets.PRIVATE_KEY }}"
PUBLIC_ADDRESS: "${{ secrets.PUBLIC_ADDRESS }}"

concurrency:
group: ${{ github.workflow }}
Expand All @@ -40,9 +43,17 @@ jobs:
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
- name: Request User Points Balances
run: >
dbt run -s tag:streamline_non_core --vars '{"STREAMLINE_INVOKE_STREAMS": True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": False}'
dbt run -s streamline__reward_points_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'
- name: Authenticate with Flow Points API
run: |
python python/points/authenticate.py
- name: Request Points Transfers
run: >
dbt run -s streamline__points_transfers_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'
- name: Store logs
uses: actions/upload-artifact@v3
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/dbt_run_scheduled_reward_points_silver.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ jobs:
- name: Run DBT Jobs
run: >
dbt run -s silver_api__reward_points
dbt run -s tag:streamline_non_core
- name: Test DBT Models
run: >
dbt test -s silver_api__reward_points
dbt test -s tag:streamline_non_core
continue-on-error: true

- name: Log test results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
unique_key = "reward_points_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['_inserted_timestamp :: DATE', 'address']
cluster_by = ['_inserted_timestamp :: DATE', 'address'],
tags = ['streamline_non_core']
) }}

SELECT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
target = "{{this.schema}}.{{this.identifier}}",
params = {
"external_table": "reward_points",
"sql_limit": "1000",
"sql_limit": "10000",
"producer_batch_size": "1000",
"worker_batch_size": "1000",
"sql_source": "{{this.identifier}}"
}
),
tags = ['streamline_non_core']
)
) }}

WITH evm_addresses AS (
Expand Down
24 changes: 24 additions & 0 deletions models/silver/streamline/external/streamline__evm_addresses.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: 2

models:
- name: streamline__evm_addresses
description: "Table of unique EVM addresses."
data_tests:
- dbt_utils.recency:
datepart: day
field: modified_timestamp
interval: 1

columns:
- name: address
tests:
- not_null
- unique

- name: modified_timestamp

- name: inserted_timestamp

- name: evm_addresses_id

- name: _invocation_id
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['streamline_non_core']
) }}

{{ streamline_external_table_FR_query_v2(
model = "points_transfers",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['streamline_non_core']
) }}

{{ streamline_external_table_query_v2(
model = "points_transfers",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
-- depends_on: {{ ref('bronze_api__points_transfers') }}
-- depends_on: {{ ref('bronze_api__FR_points_transfers') }}
{{ config(
materialized = 'incremental',
unique_key = "batch_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp", "_inserted_timestamp"],
cluster_by = ['modified_timestamp :: DATE', 'from_address'],
post_hook = [ "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(from_address, to_address)" ],
tags = ['streamline_non_core']
) }}

WITH points_transfers_raw AS (

SELECT
partition_key,
TO_TIMESTAMP(partition_key) :: DATE AS request_date,
DATA,
_inserted_timestamp
FROM

{% if is_incremental() %}
{{ ref('bronze_api__points_transfers') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
{{ ref('bronze_api__FR_points_transfers') }}
{% endif %}
),
flatten_protocols AS (
SELECT
partition_key,
request_date,
_inserted_timestamp,
A.value :address :: STRING AS address,
A.value :transfers :: ARRAY AS transfers
FROM
points_transfers_raw,
LATERAL FLATTEN(DATA) A
),
flatten_batches AS (
SELECT
partition_key,
request_date,
_inserted_timestamp,
address AS from_address,
A.index AS batch_index,
A.value :batchId :: STRING AS batch_id,
A.value :status :: STRING AS batch_status,
A.value :transfers :: ARRAY AS batch_transfers
FROM
flatten_protocols,
LATERAL FLATTEN(
transfers
) A
),
flatten_transfers AS (
SELECT
partition_key,
request_date,
from_address,
batch_index,
batch_id,
_inserted_timestamp,
A.index AS transfer_index,
A.value :boxes :: NUMBER AS boxes,
A.value :keys :: NUMBER AS keys,
A.value :points :: NUMBER AS points,
A.value :toAddressId :: STRING AS to_address
FROM
flatten_batches,
LATERAL FLATTEN(batch_transfers) A
)
SELECT
request_date,
batch_id,
batch_index,
transfer_index,
from_address,
to_address,
boxes,
keys,
points,
partition_key,
{{ dbt_utils.generate_surrogate_key(
['batch_id', 'transfer_index']
) }} AS points_transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id,
_inserted_timestamp
FROM
flatten_transfers

qualify(ROW_NUMBER() over (PARTITION BY batch_id
ORDER BY
_inserted_timestamp ASC)) = 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
version: 2

models:
- name: silver_api__points_transfers
description: "Response from the Reward Points API Transfers Endpoint. Logs each response and dedplicates by batch_id. Original _inserted_timestamp preserved as request timestamp."
tests:
- dbt_utils.recency:
datepart: day
field: request_date
interval: 1

columns:
- name: partition_key

- name: request_date

- name: from_address
tests:
- not_null

- name: batch_id
tests:
- not_null

- name: batch_index
tests:
- not_null

- name: transfer_index
tests:
- not_null

- name: boxes
tests:
- not_null

- name: keys
tests:
- not_null

- name: points

- name: to_address
tests:
- not_null

- name: points_transfers_id
tests:
- not_null
- unique

- name: inserted_timestamp

- name: modified_timestamp

- name: _invocation_id
- name: _inserted_timestamp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = '{{this.schema}}.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = {
"external_table": "points_transfers",
"sql_limit": "1",
"producer_batch_size": "1",
"worker_batch_size": "1",
"sql_source": "{{this.identifier}}"
}
)
) }}


SELECT
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{Service}/points/dapp/transfer/all',
{
'Authorization': 'Bearer ' || '{{ env_var("JWT", "") }}',
'Accept': 'application/json',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'User-Agent': 'Flipside/0.1'
},
{},
'Vault/prod/flow/points-api/prod'
) AS request

1 change: 1 addition & 0 deletions models/sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ sources:
- name: evm_receipts
- name: evm_traces
- name: reward_points
- name: points_transfers

- name: crosschain_silver
database: crosschain
Expand Down
Loading

0 comments on commit 4df472b

Please sign in to comment.