-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* init api models * rm * add vault path * fix typo json key * rm qualify from first silver model * points spend silver model * upd silver spend columns * gha workflow and tags * disable workflow * disable correct gha * add yml for silver model * tag w streamline_non_core * upd tags, gha workflow, final tweaks * upd so, cluster, unique_key
- Loading branch information
Showing
9 changed files
with
387 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
53 changes: 53 additions & 0 deletions
53
.github/workflows/dbt_run_scheduled_transaction_entries_realtime.yml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
name: dbt_run_scheduled_transaction_entries_realtime | ||
run-name: dbt_run_scheduled_transaction_entries_realtime | ||
|
||
on: | ||
workflow_dispatch: | ||
schedule: | ||
# Daily at 00:00 UTC | ||
- cron: "0 0 * * *" | ||
|
||
env: | ||
USE_VARS: "${{ vars.USE_VARS }}" | ||
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" | ||
DBT_VERSION: "${{ vars.DBT_VERSION }}" | ||
ACCOUNT: "${{ vars.ACCOUNT }}" | ||
ROLE: "${{ vars.ROLE }}" | ||
USER: "${{ vars.USER }}" | ||
PASSWORD: "${{ secrets.PASSWORD }}" | ||
REGION: "${{ vars.REGION }}" | ||
DATABASE: "${{ vars.DATABASE }}" | ||
WAREHOUSE: "${{ vars.WAREHOUSE }}" | ||
SCHEMA: "${{ vars.SCHEMA }}" | ||
|
||
concurrency: | ||
group: ${{ github.workflow }} | ||
|
||
jobs: | ||
dbt: | ||
runs-on: ubuntu-latest | ||
environment: | ||
name: workflow_prod | ||
steps: | ||
- uses: actions/checkout@v3 | ||
- uses: actions/setup-python@v4 | ||
with: | ||
python-version: "3.10" | ||
cache: "pip" | ||
|
||
- name: install dependencies | ||
run: | | ||
pip install -r requirements.txt | ||
dbt deps | ||
- name: Run DBT Jobs | ||
run: > | ||
dbt run -s streamline__transaction_entries_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' | ||
- name: Store logs | ||
uses: actions/upload-artifact@v3 | ||
with: | ||
name: dbt-logs | ||
path: | | ||
logs | ||
target |
9 changes: 9 additions & 0 deletions
9
models/silver/streamline/external/storefront/bronze_api__FR_transaction_entries.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
{{ config ( | ||
materialized = 'view', | ||
tags = ['streamline_non_core', 'rewards_points_spend'] | ||
) }} | ||
|
||
{{ streamline_external_table_FR_query_v2( | ||
model = "transaction_entries", | ||
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" | ||
) }} |
9 changes: 9 additions & 0 deletions
9
models/silver/streamline/external/storefront/bronze_api__transaction_entries.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
{{ config ( | ||
materialized = 'view', | ||
tags = ['streamline_non_core', 'rewards_points_spend'] | ||
) }} | ||
|
||
{{ streamline_external_table_query_v2( | ||
model = "transaction_entries", | ||
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" | ||
) }} |
88 changes: 88 additions & 0 deletions
88
models/silver/streamline/external/storefront/silver_api__reward_points_spend.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
{{ config( | ||
materialized = 'incremental', | ||
unique_key = "entry_id", | ||
incremental_strategy = 'merge', | ||
merge_exclude_columns = ["inserted_timestamp"], | ||
cluster_by = ['_inserted_timestamp :: DATE'], | ||
post_hook = [ "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(user_wallet_address)" ], | ||
tags = ['streamline_non_core', 'rewards_points_spend'] | ||
) }} | ||
|
||
WITH silver_responses AS ( | ||
|
||
SELECT | ||
partition_key, | ||
entry_id, | ||
created_at, | ||
INDEX, | ||
DATA, | ||
_inserted_timestamp | ||
FROM | ||
{{ ref('silver_api__transaction_entries') }} | ||
|
||
{% if is_incremental() %} | ||
WHERE | ||
modified_timestamp >= ( | ||
SELECT | ||
MAX(modified_timestamp) | ||
FROM | ||
{{ this }} | ||
) | ||
{% endif %} | ||
|
||
qualify(ROW_NUMBER() over (PARTITION BY entry_id | ||
ORDER BY | ||
_inserted_timestamp DESC)) = 1 | ||
) | ||
SELECT | ||
entry_id, | ||
created_at, | ||
|
||
DATA :direction :: STRING AS direction, | ||
DATA :amount :: NUMBER AS amount, | ||
DATA :loyaltyAccountStartAmount :: NUMBER AS amount_start, | ||
DATA :loyaltyAccountEndAmount :: NUMBER AS amount_end, | ||
|
||
DATA :idempotencyKey :: STRING AS idempotency_key, | ||
DATA :organizationId :: STRING AS organization_id, | ||
DATA :websiteId :: STRING AS website_id, | ||
|
||
DATA :loyaltyAccountId :: STRING AS account_id, | ||
DATA :loyaltyAccount :user :id :: STRING AS user_id, | ||
DATA :loyaltyAccount :user :walletAddress :: STRING AS user_wallet_address, | ||
|
||
DATA :loyaltyTransactionId :: STRING AS transaction_id, | ||
DATA :loyaltyTransaction :description :: STRING AS transaction_description, | ||
DATA :loyaltyTransaction :type :: STRING AS transaction_type, | ||
|
||
DATA :loyaltyTransaction :loyaltyRule :id :: STRING AS rule_id, | ||
DATA :loyaltyTransaction :loyaltyRule :type :: STRING AS rule_type, | ||
DATA :loyaltyTransaction :loyaltyRule :name :: STRING AS rule_name, | ||
DATA :loyaltyTransaction :loyaltyRule :description :: STRING AS rule_description, | ||
DATA :loyaltyTransaction :loyaltyRule :metadata :: variant AS rule_metadata, | ||
|
||
OBJECT_DELETE( | ||
DATA, | ||
'amount', | ||
'createdAt', | ||
'direction', | ||
'idempotencyKey', | ||
'loyaltyAccount', | ||
'loyaltyAccountId', | ||
'loyaltyAccountEndAmount', | ||
'loyaltyAccountStartAmount', | ||
'loyaltyTransactionId', | ||
'organizationId', | ||
'websiteId' | ||
) AS DATA, | ||
partition_key, | ||
INDEX, | ||
_inserted_timestamp, | ||
{{ dbt_utils.generate_surrogate_key( | ||
['entry_id', 'partition_key'] | ||
) }} AS reward_points_spend_id, | ||
SYSDATE() AS inserted_timestamp, | ||
SYSDATE() AS modified_timestamp, | ||
'{{ invocation_id }}' AS _invocation_id | ||
FROM | ||
silver_responses |
101 changes: 101 additions & 0 deletions
101
models/silver/streamline/external/storefront/silver_api__reward_points_spend.yml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
version: 2 | ||
|
||
models: | ||
- name: silver_api__reward_points_spend | ||
description: "Response from the Storefront Transaction Entries API" | ||
tests: | ||
- dbt_utils.recency: | ||
datepart: day | ||
field: created_at | ||
interval: 1 | ||
|
||
columns: | ||
- name: entry_id | ||
tests: | ||
- not_null | ||
- unique | ||
|
||
- name: created_at | ||
tests: | ||
- not_null | ||
|
||
- name: direction | ||
tests: | ||
- not_null | ||
|
||
- name: amount | ||
tests: | ||
- not_null | ||
|
||
- name: amount_start | ||
tests: | ||
- not_null | ||
|
||
- name: amount_end | ||
tests: | ||
- not_null | ||
|
||
- name: idempotency_key | ||
|
||
- name: organization_id | ||
tests: | ||
- not_null | ||
|
||
- name: website_id | ||
tests: | ||
- not_null | ||
|
||
- name: account_id | ||
tests: | ||
- not_null | ||
|
||
- name: user_id | ||
tests: | ||
- not_null | ||
|
||
- name: user_wallet_address | ||
tests: | ||
- not_null | ||
|
||
- name: transaction_id | ||
tests: | ||
- not_null | ||
- unique | ||
|
||
- name: transaction_description | ||
|
||
- name: transaction_type | ||
tests: | ||
- not_null | ||
|
||
- name: rule_id | ||
tests: | ||
- not_null | ||
|
||
- name: rule_type | ||
tests: | ||
- not_null | ||
|
||
- name: rule_name | ||
|
||
- name: rule_description | ||
|
||
- name: rule_metadata | ||
|
||
- name: data | ||
|
||
- name: partition_key | ||
tests: | ||
- not_null | ||
|
||
- name: index | ||
|
||
- name: _inserted_timestamp | ||
|
||
- name: reward_points_spend_id | ||
|
||
- name: inserted_timestamp | ||
|
||
- name: modified_timestamp | ||
|
||
- name: _invocation_id |
63 changes: 63 additions & 0 deletions
63
models/silver/streamline/external/storefront/silver_api__transaction_entries.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
-- depends_on: {{ ref('bronze_api__transaction_entries') }} | ||
-- depends_on: {{ ref('bronze_api__FR_transaction_entries') }} | ||
{{ config( | ||
materialized = 'incremental', | ||
unique_key = "transaction_entries_id", | ||
incremental_strategy = 'merge', | ||
merge_exclude_columns = ["inserted_timestamp"], | ||
cluster_by = ['_inserted_timestamp :: DATE'], | ||
tags = ['streamline_non_core', 'rewards_points_spend'] | ||
) }} | ||
|
||
WITH bronze AS ( | ||
|
||
SELECT | ||
partition_key, | ||
DATA, | ||
VALUE :STARTING_AFTER :: STRING AS starting_after, | ||
VALUE :API_LIMIT :: INTEGER AS api_limit, | ||
ARRAY_SIZE( | ||
DATA :data :: ARRAY | ||
) AS entry_count, | ||
DATA :data [0] :id :: STRING AS first_entry_id, | ||
DATA :data [entry_count - 1] :id :: STRING AS last_entry_id, | ||
_inserted_timestamp | ||
FROM | ||
|
||
{% if is_incremental() %} | ||
{{ ref('bronze_api__transaction_entries') }} | ||
WHERE | ||
_inserted_timestamp >= ( | ||
SELECT | ||
MAX(_inserted_timestamp) _inserted_timestamp | ||
FROM | ||
{{ this }} | ||
) | ||
{% else %} | ||
{{ ref('bronze_api__FR_transaction_entries') }} | ||
{% endif %} | ||
) | ||
SELECT | ||
partition_key, | ||
entry_count, | ||
starting_after, | ||
api_limit, | ||
first_entry_id AS request_first_entry_id, | ||
last_entry_id AS request_last_entry_id, | ||
VALUE :createdAt :: timestamp_ntz AS created_at, | ||
VALUE :id :: STRING AS entry_id, | ||
INDEX :: INTEGER AS INDEX, | ||
VALUE :: variant AS DATA, | ||
_inserted_timestamp, | ||
{{ dbt_utils.generate_surrogate_key( | ||
['entry_id', 'partition_key'] | ||
) }} AS transaction_entries_id, | ||
SYSDATE() AS inserted_timestamp, | ||
SYSDATE() AS modified_timestamp, | ||
'{{ invocation_id }}' AS _invocation_id | ||
FROM | ||
bronze, | ||
LATERAL FLATTEN( | ||
input => DATA :data :: ARRAY | ||
) | ||
|
62 changes: 62 additions & 0 deletions
62
models/silver/streamline/external/storefront/streamline__transaction_entries_realtime.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
-- depends_on: {{ ref('silver_api__transaction_entries') }} | ||
{{ config ( | ||
materialized = "view", | ||
post_hook = fsc_utils.if_data_call_function_v2( | ||
func = '{{this.schema}}.udf_bulk_rest_api_v2', | ||
target = "{{this.schema}}.{{this.identifier}}", | ||
params ={ "external_table": "transaction_entries", | ||
"sql_limit": "1", | ||
"producer_batch_size": "1", | ||
"worker_batch_size": "1", | ||
"sql_source": "{{this.identifier}}" } | ||
) | ||
) }} | ||
|
||
{% if not var( | ||
'STOREFRONT_INITIAL_RUN', | ||
false | ||
) %} | ||
{% if execute %} | ||
{% set query %} | ||
WITH target_entry_id AS ( | ||
|
||
SELECT | ||
entry_id, | ||
ROW_NUMBER() over ( | ||
ORDER BY | ||
partition_key DESC, | ||
INDEX DESC | ||
) AS rn | ||
FROM | ||
{{ ref('silver_api__transaction_entries') }} | ||
{# WHERE _inserted_timestamp >= CURRENT_DATE - 3 #} | ||
) | ||
SELECT | ||
entry_id | ||
FROM | ||
target_entry_id | ||
WHERE | ||
rn = 2 | ||
{% endset %} | ||
{% set starting_after = run_query(query).columns [0].values() [0] %} | ||
{{ log( | ||
"last_id: " ~ starting_after, | ||
info = True | ||
) }} | ||
{% endif %} | ||
{% endif %} | ||
|
||
SELECT | ||
{{ var( | ||
'API_LIMIT', | ||
1000 | ||
) }} AS api_limit, | ||
'{{ starting_after }}' AS starting_after, | ||
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key, | ||
{{ target.database }}.live.udf_api( | ||
'GET', | ||
'{Service}/api/loyalty/transaction_entries' || '?limit=' || api_limit{% if not var('STOREFRONT_INITIAL_RUN', false) %} || '&startingAfter=' || '{{ starting_after }}'{% endif %}, | ||
{ 'x-api-key': '{Authentication}' }, | ||
{}, | ||
'Vault/prod/flow/snag-api' | ||
) AS request |
Oops, something went wrong.