Skip to content

Commit

Permalink
AN-5517/api integration - GET minting/assets (#381)
Browse files Browse the repository at this point in the history
* api integration - GET minting/assets

* add to gha workflow

* fixes
  • Loading branch information
forgxyz authored Nov 21, 2024
1 parent 15dcc02 commit 6117353
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ jobs:
- name: Run DBT Jobs
run: >
dbt run -s streamline__transaction_entries_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'
dbt run -s streamline__minting_assets_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'
- name: Store logs
uses: actions/upload-artifact@v3
Expand Down
1 change: 1 addition & 0 deletions models/sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ sources:
- name: reward_points
- name: transaction_entries
- name: points_transfers
- name: minting_assets

- name: crosschain_silver
database: crosschain
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['streamline_non_core', 'minting_assets']
) }}

{{ streamline_external_table_FR_query_v2(
model = "minting_assets",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['streamline_non_core', 'minting_assets']
) }}

{{ streamline_external_table_query_v2(
model = "minting_assets",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
-- depends_on: {{ ref('bronze_api__minting_assets') }}
-- depends_on: {{ ref('bronze_api__FR_minting_assets') }}
{{ config(
materialized = 'incremental',
unique_key = "minting_assets_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['_inserted_timestamp :: DATE'],
tags = ['streamline_non_core']
) }}

WITH bronze AS (

SELECT
partition_key,
DATA,
VALUE :STARTING_AFTER :: STRING AS starting_after,
VALUE :API_LIMIT :: INTEGER AS api_limit,
ARRAY_SIZE(
DATA :data :: ARRAY
) AS item_count,
DATA :data [0] :id :: STRING AS first_item_id,
DATA :data [item_count - 1] :id :: STRING AS last_item_id,
_inserted_timestamp
FROM

{% if is_incremental() %}
{{ ref('bronze_api__minting_assets') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze_api__FR_minting_assets') }}
{% endif %}
)
SELECT
partition_key,
item_count,
starting_after,
api_limit,
first_item_id AS request_first_item_id,
last_item_id AS request_last_item_id,
VALUE :createdAt :: timestamp_ntz AS created_at,
VALUE :id :: STRING AS item_id,
INDEX :: INTEGER AS INDEX,
VALUE :: variant AS DATA,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['item_id', 'partition_key']
) }} AS minting_assets_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
bronze,
LATERAL FLATTEN(
input => DATA :data :: ARRAY
)

Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
{{ config(
materialized = 'incremental',
unique_key = "storefront_item_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['_inserted_timestamp :: DATE'],
post_hook = [ "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(storefront_item_id)" ],
tags = ['streamline_non_core']
) }}

WITH silver_responses AS (

SELECT
partition_key,
item_id,
created_at,
INDEX,
DATA,
_inserted_timestamp
FROM
{{ ref('silver_api__minting_assets') }}

{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

qualify(ROW_NUMBER() over (PARTITION BY item_id
ORDER BY
_inserted_timestamp DESC)) = 1
)
SELECT
item_id,
created_at,

DATA :additionalMedia :: variant AS additional_media,
DATA :animationUrl :: STRING AS animation_url,
DATA :artistInfo :: variant AS artist_info,
DATA :assetNr :: STRING AS asset_nr,
DATA :attributes :: variant AS attributes,
DATA :auctionId :: STRING AS auction_id,
DATA :auctionItems :: variant AS auction_items,
DATA :burnToRedeemDetails :: variant AS burn_to_redeem_details,
DATA :burnToRedeemType :: STRING AS burn_to_redeem_type,
DATA :burnedAt :: timestamp_ntz AS burned_at,
DATA :contractAddress :: STRING AS contract_address,
DATA :currencyAddress :: STRING AS currency_address,
DATA :currencyDecimals :: INTEGER AS currency_decimals,
DATA :deletedAt :: timestamp_ntz AS deleted_at,
DATA :description :: STRING AS description,
DATA :doesRequireW9 :: BOOLEAN AS does_require_w9,
DATA :enableBurnToRedeem :: BOOLEAN AS enable_burn_to_redeem,
DATA :imageUrl :: STRING AS image_url,
DATA :isListed :: BOOLEAN AS is_listed,
DATA :isPhygitalItem :: BOOLEAN AS is_phygital_item,
DATA :isPriceInUSD :: BOOLEAN AS is_price_in_usd,
DATA :isRedeemableSeparateFromPurchase :: BOOLEAN AS is_redeemable_separate_from_purchase,
DATA :listingEndsAt :: timestamp_ntz AS listing_ends_at,
DATA :listingStartsAt :: timestamp_ntz AS listing_starts_at,
DATA :listingType :: STRING AS listing_type,
DATA :loyaltyCurrency :: STRING AS loyalty_currency,
DATA :loyaltyCurrencyId :: STRING AS loyalty_currency_id,
DATA :metadataUri :: STRING AS metadata_uri,
DATA :mintStatus :: STRING AS mint_status,
DATA :mintingContract :: STRING AS minting_contract,
DATA :mintingContractAssetMintStatus :: STRING AS minting_contract_asset_mint_status,
DATA :mintingContractId :: STRING AS minting_contract_id,
DATA :name :: STRING AS name,
DATA :network :: STRING AS network,
DATA :organizationId :: STRING AS organization_id,
DATA :perUserMintLimit :: INTEGER AS per_user_mint_limit,
DATA :perUserMintMinLimit :: INTEGER AS per_user_mint_min_limit,
DATA :preRevealMedia :: variant AS pre_reveal_media,
DATA :price :: NUMBER AS price,
DATA :quantity :: INTEGER AS quantity,
DATA :quantityMinted :: INTEGER AS quantity_minted,
DATA :redeemableUntil :: timestamp_ntz AS redeemable_until,
DATA :revealedAt :: timestamp_ntz AS revealed_at,
DATA :shippingPrice :: NUMBER AS shipping_price,
DATA :shippingWithin :: STRING AS shipping_within,
DATA :statusTracker :: STRING AS status_tracker,
DATA :tokenId :: STRING AS token_id,
DATA :tokenType :: STRING AS token_type,
DATA :updatedAt :: timestamp_ntz AS updated_at,
DATA :websiteId :: STRING AS website_id,

partition_key,
INDEX,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['item_id', 'partition_key']
) }} AS storefront_item_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
silver_responses
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
version: 2

models:
- name: silver_api__storefront_items
description: "Response from the Minting Assets API"
tests:
- dbt_utils.recency:
datepart: day
field: created_at
interval: 1

columns:
- name: item_id
tests:
- not_null
- unique

- name: created_at
tests:
- not_null

- name: partition_key
tests:
- not_null

- name: index

- name: _inserted_timestamp

- name: reward_points_spend_id

- name: inserted_timestamp

- name: modified_timestamp

- name: _invocation_id
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
-- depends_on: {{ ref('silver_api__storefront_items') }}

{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = '{{this.schema}}.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table": "minting_assets",
"sql_limit": "1",
"producer_batch_size": "1",
"worker_batch_size": "1",
"sql_source": "{{this.identifier}}" }
)
) }}

{% if not var(
'STOREFRONT_INITIAL_RUN',
false
) %}
{% if execute %}
{% set query %}
WITH target_id AS (

SELECT
item_id,
ROW_NUMBER() over (
ORDER BY
partition_key DESC,
INDEX DESC
) AS rn
FROM
{{ ref('silver_api__storefront_items') }}
{# WHERE _inserted_timestamp >= CURRENT_DATE - 3 #}
)
SELECT
item_id
FROM
target_id
WHERE
rn = 2
{% endset %}
{% set starting_after = run_query(query).columns [0].values() [0] %}
{{ log(
"last_id: " ~ starting_after,
info = True
) }}
{% endif %}
{% endif %}

SELECT
{{ var(
'API_LIMIT',
1000
) }} AS api_limit,
'{{ starting_after }}' AS starting_after,
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{Service}/api/minting/assets' || '?limit=' || api_limit{% if not var('STOREFRONT_INITIAL_RUN', false) %} || '&startingAfter=' || '{{ starting_after }}'{% endif %},
{ 'x-api-key': '{Authentication}' },
{},
'Vault/prod/flow/snag-api'
) AS request

0 comments on commit 6117353

Please sign in to comment.