diff --git a/.github/workflows/dbt_run_scheduled_transaction_entries_realtime.yml b/.github/workflows/dbt_run_scheduled_transaction_entries_realtime.yml index aa0e61b1..6b246d31 100644 --- a/.github/workflows/dbt_run_scheduled_transaction_entries_realtime.yml +++ b/.github/workflows/dbt_run_scheduled_transaction_entries_realtime.yml @@ -43,6 +43,7 @@ jobs: - name: Run DBT Jobs run: > dbt run -s streamline__transaction_entries_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' + dbt run -s streamline__minting_assets_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' - name: Store logs uses: actions/upload-artifact@v3 diff --git a/models/sources.yml b/models/sources.yml index 4e64ee55..f33c710e 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -128,6 +128,7 @@ sources: - name: reward_points - name: transaction_entries - name: points_transfers + - name: minting_assets - name: crosschain_silver database: crosschain diff --git a/models/streamline/external/storefront_items/bronze_api__FR_minting_assets.sql b/models/streamline/external/storefront_items/bronze_api__FR_minting_assets.sql new file mode 100644 index 00000000..f5a3a042 --- /dev/null +++ b/models/streamline/external/storefront_items/bronze_api__FR_minting_assets.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['streamline_non_core', 'minting_assets'] +) }} + +{{ streamline_external_table_FR_query_v2( + model = "minting_assets", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/streamline/external/storefront_items/bronze_api__minting_assets.sql b/models/streamline/external/storefront_items/bronze_api__minting_assets.sql new file mode 100644 index 00000000..85118558 --- /dev/null +++ b/models/streamline/external/storefront_items/bronze_api__minting_assets.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['streamline_non_core', 'minting_assets'] +) }} + +{{ streamline_external_table_query_v2( + model = "minting_assets", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/streamline/external/storefront_items/silver_api__minting_assets.sql b/models/streamline/external/storefront_items/silver_api__minting_assets.sql new file mode 100644 index 00000000..3879e75a --- /dev/null +++ b/models/streamline/external/storefront_items/silver_api__minting_assets.sql @@ -0,0 +1,63 @@ +-- depends_on: {{ ref('bronze_api__minting_assets') }} +-- depends_on: {{ ref('bronze_api__FR_minting_assets') }} +{{ config( + materialized = 'incremental', + unique_key = "minting_assets_id", + incremental_strategy = 'merge', + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['_inserted_timestamp :: DATE'], + tags = ['streamline_non_core'] +) }} + +WITH bronze AS ( + + SELECT + partition_key, + DATA, + VALUE :STARTING_AFTER :: STRING AS starting_after, + VALUE :API_LIMIT :: INTEGER AS api_limit, + ARRAY_SIZE( + DATA :data :: ARRAY + ) AS item_count, + DATA :data [0] :id :: STRING AS first_item_id, + DATA :data [item_count - 1] :id :: STRING AS last_item_id, + _inserted_timestamp + FROM + +{% if is_incremental() %} +{{ ref('bronze_api__minting_assets') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze_api__FR_minting_assets') }} +{% endif %} +) +SELECT + partition_key, + item_count, + starting_after, + api_limit, + first_item_id AS request_first_item_id, + last_item_id AS request_last_item_id, + VALUE :createdAt :: timestamp_ntz AS created_at, + VALUE :id :: STRING AS item_id, + INDEX :: INTEGER AS INDEX, + VALUE :: variant AS DATA, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['item_id', 'partition_key'] + ) }} AS minting_assets_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + bronze, + LATERAL FLATTEN( + input => DATA :data :: ARRAY + ) + diff --git a/models/streamline/external/storefront_items/silver_api__storefront_items.sql b/models/streamline/external/storefront_items/silver_api__storefront_items.sql new file mode 100644 index 00000000..f3f76a2f --- /dev/null +++ b/models/streamline/external/storefront_items/silver_api__storefront_items.sql @@ -0,0 +1,102 @@ +{{ config( + materialized = 'incremental', + unique_key = "storefront_item_id", + incremental_strategy = 'merge', + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['_inserted_timestamp :: DATE'], + post_hook = [ "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(storefront_item_id)" ], + tags = ['streamline_non_core'] +) }} + +WITH silver_responses AS ( + + SELECT + partition_key, + item_id, + created_at, + INDEX, + DATA, + _inserted_timestamp + FROM + {{ ref('silver_api__minting_assets') }} + +{% if is_incremental() %} +WHERE + modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY item_id +ORDER BY + _inserted_timestamp DESC)) = 1 +) +SELECT + item_id, + created_at, + + DATA :additionalMedia :: variant AS additional_media, + DATA :animationUrl :: STRING AS animation_url, + DATA :artistInfo :: variant AS artist_info, + DATA :assetNr :: STRING AS asset_nr, + DATA :attributes :: variant AS attributes, + DATA :auctionId :: STRING AS auction_id, + DATA :auctionItems :: variant AS auction_items, + DATA :burnToRedeemDetails :: variant AS burn_to_redeem_details, + DATA :burnToRedeemType :: STRING AS burn_to_redeem_type, + DATA :burnedAt :: timestamp_ntz AS burned_at, + DATA :contractAddress :: STRING AS contract_address, + DATA :currencyAddress :: STRING AS currency_address, + DATA :currencyDecimals :: INTEGER AS currency_decimals, + DATA :deletedAt :: timestamp_ntz AS deleted_at, + DATA :description :: STRING AS description, + DATA :doesRequireW9 :: BOOLEAN AS does_require_w9, + DATA :enableBurnToRedeem :: BOOLEAN AS enable_burn_to_redeem, + DATA :imageUrl :: STRING AS image_url, + DATA :isListed :: BOOLEAN AS is_listed, + DATA :isPhygitalItem :: BOOLEAN AS is_phygital_item, + DATA :isPriceInUSD :: BOOLEAN AS is_price_in_usd, + DATA :isRedeemableSeparateFromPurchase :: BOOLEAN AS is_redeemable_separate_from_purchase, + DATA :listingEndsAt :: timestamp_ntz AS listing_ends_at, + DATA :listingStartsAt :: timestamp_ntz AS listing_starts_at, + DATA :listingType :: STRING AS listing_type, + DATA :loyaltyCurrency :: STRING AS loyalty_currency, + DATA :loyaltyCurrencyId :: STRING AS loyalty_currency_id, + DATA :metadataUri :: STRING AS metadata_uri, + DATA :mintStatus :: STRING AS mint_status, + DATA :mintingContract :: STRING AS minting_contract, + DATA :mintingContractAssetMintStatus :: STRING AS minting_contract_asset_mint_status, + DATA :mintingContractId :: STRING AS minting_contract_id, + DATA :name :: STRING AS name, + DATA :network :: STRING AS network, + DATA :organizationId :: STRING AS organization_id, + DATA :perUserMintLimit :: INTEGER AS per_user_mint_limit, + DATA :perUserMintMinLimit :: INTEGER AS per_user_mint_min_limit, + DATA :preRevealMedia :: variant AS pre_reveal_media, + DATA :price :: NUMBER AS price, + DATA :quantity :: INTEGER AS quantity, + DATA :quantityMinted :: INTEGER AS quantity_minted, + DATA :redeemableUntil :: timestamp_ntz AS redeemable_until, + DATA :revealedAt :: timestamp_ntz AS revealed_at, + DATA :shippingPrice :: NUMBER AS shipping_price, + DATA :shippingWithin :: STRING AS shipping_within, + DATA :statusTracker :: STRING AS status_tracker, + DATA :tokenId :: STRING AS token_id, + DATA :tokenType :: STRING AS token_type, + DATA :updatedAt :: timestamp_ntz AS updated_at, + DATA :websiteId :: STRING AS website_id, + + partition_key, + INDEX, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['item_id', 'partition_key'] + ) }} AS storefront_item_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + silver_responses diff --git a/models/streamline/external/storefront_items/silver_api__storefront_items.yml b/models/streamline/external/storefront_items/silver_api__storefront_items.yml new file mode 100644 index 00000000..36b797a5 --- /dev/null +++ b/models/streamline/external/storefront_items/silver_api__storefront_items.yml @@ -0,0 +1,36 @@ +version: 2 + +models: + - name: silver_api__storefront_items + description: "Response from the Minting Assets API" + tests: + - dbt_utils.recency: + datepart: day + field: created_at + interval: 1 + + columns: + - name: item_id + tests: + - not_null + - unique + + - name: created_at + tests: + - not_null + + - name: partition_key + tests: + - not_null + + - name: index + + - name: _inserted_timestamp + + - name: reward_points_spend_id + + - name: inserted_timestamp + + - name: modified_timestamp + + - name: _invocation_id diff --git a/models/streamline/external/storefront_items/streamline__minting_assets_realtime.sql b/models/streamline/external/storefront_items/streamline__minting_assets_realtime.sql new file mode 100644 index 00000000..b28d5923 --- /dev/null +++ b/models/streamline/external/storefront_items/streamline__minting_assets_realtime.sql @@ -0,0 +1,63 @@ + -- depends_on: {{ ref('silver_api__storefront_items') }} + +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = '{{this.schema}}.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table": "minting_assets", + "sql_limit": "1", + "producer_batch_size": "1", + "worker_batch_size": "1", + "sql_source": "{{this.identifier}}" } + ) +) }} + +{% if not var( + 'STOREFRONT_INITIAL_RUN', + false + ) %} + {% if execute %} + {% set query %} + WITH target_id AS ( + + SELECT + item_id, + ROW_NUMBER() over ( + ORDER BY + partition_key DESC, + INDEX DESC + ) AS rn + FROM + {{ ref('silver_api__storefront_items') }} + {# WHERE _inserted_timestamp >= CURRENT_DATE - 3 #} + ) + SELECT + item_id + FROM + target_id + WHERE + rn = 2 + {% endset %} + {% set starting_after = run_query(query).columns [0].values() [0] %} + {{ log( + "last_id: " ~ starting_after, + info = True + ) }} + {% endif %} +{% endif %} + +SELECT + {{ var( + 'API_LIMIT', + 1000 + ) }} AS api_limit, + '{{ starting_after }}' AS starting_after, + DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + '{Service}/api/minting/assets' || '?limit=' || api_limit{% if not var('STOREFRONT_INITIAL_RUN', false) %} || '&startingAfter=' || '{{ starting_after }}'{% endif %}, + { 'x-api-key': '{Authentication}' }, + {}, + 'Vault/prod/flow/snag-api' + ) AS request