From 64311057dc369e01d83e39da6e6c33e2516d66fd Mon Sep 17 00:00:00 2001 From: Jack Forgash <58153492+forgxyz@users.noreply.github.com> Date: Wed, 8 Nov 2023 21:13:42 -0700 Subject: [PATCH] mainnet23 history model w current external table setup --- ...eamline__get_blocks_history_mainnet_23.sql | 45 ++++++++++++++ ...ne__get_collections_history_mainnet_23.sql | 58 +++++++++++++++++++ ...transaction_results_history_mainnet_23.sql | 57 ++++++++++++++++++ ...e__get_transactions_history_mainnet_23.sql | 57 ++++++++++++++++++ 4 files changed, 217 insertions(+) create mode 100644 models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet_23.sql create mode 100644 models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet_23.sql create mode 100644 models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_23.sql create mode 100644 models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet_23.sql diff --git a/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet_23.sql b/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet_23.sql new file mode 100644 index 00000000..c2596c83 --- /dev/null +++ b/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet_23.sql @@ -0,0 +1,45 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet23.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_history_23'] +) }} + +WITH blocks AS ( + + SELECT + block_height + FROM + {{ ref("streamline__blocks") }} + WHERE + block_height BETWEEN 55114467 + AND 65264618 + EXCEPT + SELECT + block_number AS block_height + FROM + {{ ref("streamline__complete_get_blocks") }} + WHERE + block_height BETWEEN 55114467 + AND 65264618 +) +SELECT + OBJECT_CONSTRUCT( + 'grpc', + 'proto3', + 'method', + 'get_block_by_height', + 'block_height', + block_height, + 'method_params', + OBJECT_CONSTRUCT( + 'height', + block_height + ) + ) AS request +FROM + blocks +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet_23.sql b/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet_23.sql new file mode 100644 index 00000000..bf69832b --- /dev/null +++ b/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet_23.sql @@ -0,0 +1,58 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet23.nodes.onflow.org:9000','external_table', 'collections', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_history_23'] +) }} + +WITH +-- CTE to get all block_heights and their associated collection_ids from the complete_get_blocks_history table +block_collections AS ( + + SELECT + cb.block_number AS block_height, + collection_guarantee.value :collection_id :: STRING AS collection_id + FROM + {{ ref("streamline__complete_get_blocks") }} + cb, + LATERAL FLATTEN( + input => cb.data :collection_guarantees + ) AS collection_guarantee + WHERE + block_height BETWEEN 55114467 + AND 65264618 +), +-- CTE to identify collections that haven't been ingested yet +collections_to_ingest AS ( + SELECT + bc.block_height, + bc.collection_id + FROM + block_collections bc + LEFT JOIN {{ ref("streamline__complete_get_collections") }} C + ON bc.block_height = C.block_number + AND bc.collection_id = C.id + WHERE + C.id IS NULL +) +-- Generate the requests based on the missing collections +SELECT + OBJECT_CONSTRUCT( + 'grpc', + 'proto3', + 'method', + 'get_collection_by_i_d', + 'block_height', + block_height :: INTEGER, + 'method_params', + OBJECT_CONSTRUCT( + 'id', + collection_id + ) + ) AS request +FROM + collections_to_ingest +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_23.sql b/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_23.sql new file mode 100644 index 00000000..83856cf3 --- /dev/null +++ b/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_23.sql @@ -0,0 +1,57 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet23.nodes.onflow.org:9000','external_table', 'transaction_results', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_history_23'] +) }} + +WITH collection_transactions AS ( + + SELECT + block_number AS block_height, + TRANSACTION.value :: STRING AS transaction_id + FROM + {{ ref('streamline__complete_get_collections') }} + cch, + LATERAL FLATTEN( + input => cch.data :transaction_ids + ) AS TRANSACTION + WHERE + block_height BETWEEN 55114467 + AND 65264618 +), +-- CTE to identify transactions that haven't been ingested yet +transactions_to_ingest AS ( + SELECT + ct.block_height, + ct.transaction_id + FROM + collection_transactions ct + LEFT JOIN {{ ref("streamline__complete_get_transaction_results") }} + t + ON ct.transaction_id = t.id + WHERE + t.id IS NULL +) -- Generate the requests based on the missing transactions +SELECT + OBJECT_CONSTRUCT( + 'grpc', + 'proto3', + 'method', + 'get_transaction_result', + 'block_height', + block_height :: INTEGER, + 'transaction_id', + transaction_id, + 'method_params', + OBJECT_CONSTRUCT( + 'id', + transaction_id + ) + ) AS request +FROM + transactions_to_ingest +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet_23.sql b/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet_23.sql new file mode 100644 index 00000000..ea68a096 --- /dev/null +++ b/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet_23.sql @@ -0,0 +1,57 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet23.nodes.onflow.org:9000','external_table', 'transactions', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH collection_transactions AS ( + + SELECT + block_number AS block_height, + TRANSACTION.value :: STRING AS transaction_id + FROM + {{ ref('streamline__complete_get_collections') }} + cch, + LATERAL FLATTEN( + input => cch.data :transaction_ids + ) AS TRANSACTION + WHERE + block_height BETWEEN 55114467 + AND 65264618 +), +-- CTE to identify transactions that haven't been ingested yet +transactions_to_ingest AS ( + SELECT + ct.block_height, + ct.transaction_id + FROM + collection_transactions ct + LEFT JOIN {{ ref("streamline__complete_get_transactions") }} + t + ON ct.transaction_id = t.id + WHERE + t.id IS NULL +) +-- Generate the requests based on the missing transactions +SELECT + OBJECT_CONSTRUCT( + 'grpc', + 'proto3', + 'method', + 'get_transaction', + 'block_height', + block_height :: INTEGER, + 'transaction_id', + transaction_id, + 'method_params', + OBJECT_CONSTRUCT( + 'id', + transaction_id + ) + ) AS request +FROM + transactions_to_ingest +ORDER BY + block_height ASC