Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream 1088/near live view udtfs #99

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
SHELL := /bin/bash

rm_logs:
@if [ -d logs ]; then \
rm -r logs 2>/dev/null || echo "Warning: Could not remove logs directory"; \
else \
echo "Logs directory does not exist"; \
fi

deploy_near_mainnet_lv: rm_logs
dbt run \
-s livequery_models.deploy.near.near__mainnet \
--vars '{LQ_UPDATE_UDFS_AND_SPS: true, UPDATE_UDFS_AND_SPS: false}' \
--profiles-dir ~/.dbt \
--profile livequery \
--target dev

compile_near_mainnet: rm_logs
dbt compile \
-s livequery_models.deploy.near.near__mainnet \
--profiles-dir ~/.dbt \
--profile livequery \
--target dev

deploy_fact_blocks: rm_logs
dbt run \
-s livequery_models.deploy.near.silver.streamline.near_mainnet__fact_blocks \
--vars '{LQ_UPDATE_UDFS_AND_SPS: true}' \
--profiles-dir ~/.dbt \
--profile livequery \
--target dev
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -462,20 +462,20 @@ livequery:

To control the creation of UDF or SP macros with dbt run:

* UPDATE_UDFS_AND_SPS
* LQ_UPDATE_UDFS_AND_SPS
When True, executes all macros included in the on-run-start hooks within dbt_project.yml on model run as normal
When False, none of the on-run-start macros are executed on model run

Default values are False

* Usage:
`dbt run --var '{"UPDATE_UDFS_AND_SPS":True}' -m ...`
`dbt run --var '{"LQ_UPDATE_UDFS_AND_SPS":True}' -m ...`

Dropping and creating udfs can also be done without running a model:

```sh
dbt run-operation create_udfs --vars '{"UPDATE_UDFS_AND_SPS":True}' --args '{"drop_":false}'
dbt run-operation create_udfs --vars '{"UPDATE_UDFS_AND_SPS":True}' --args '{"drop_":true}'
dbt run-operation create_udfs --vars '{"LQ_UPDATE_UDFS_AND_SPS":True}' --args '{"drop_":false}'
dbt run-operation create_udfs --vars '{"LQ_UPDATE_UDFS_AND_SPS":True}' --args '{"drop_":true}'
```

## Resources
Expand Down
22 changes: 21 additions & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ models:
+tags: evm
marketplace:
+tags: marketplace
near:
silver:
streamline:
+schema: near_mainnet
+enabled: true
near_models:
+schema: streamline
+database: near_dev
gold:
core:
+schema: core
+materialized: ephemeral
silver:
streamline:
silver__streamline_blocks:
+enabled: true # set to false if you need to override the models

tests:
+store_failures: true # all tests
Expand All @@ -60,8 +76,12 @@ on-run-end:
# using the `{{ config(...) }}` macro.

vars:

near_models:
UPDATE_UDFS_AND_SPS: false

"dbt_date:time_zone": GMT
UPDATE_UDFS_AND_SPS: false
LQ_UPDATE_UDFS_AND_SPS: false
DROP_UDFS_AND_SPS: false
UPDATE_SNOWFLAKE_TAGS: true
STREAMLINE_INVOKE_STREAMS: False
Expand Down
14,345 changes: 14,344 additions & 1 deletion docs/manifest.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion macros/create_sps.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro create_sps() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% if var("LQ_UPDATE_UDFS_AND_SPS") %}
{% if target.database == 'LIVEQUERY' %}
CREATE schema IF NOT EXISTS _internal;
{{ sp_create_prod_clone('_internal') }};
Expand Down
6 changes: 3 additions & 3 deletions macros/livequery/manage_udfs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@
This macro is used to deploy functions using ephemeral models.
It should only be used within an ephemeral model.
#}
{% if execute and (var("UPDATE_UDFS_AND_SPS") or var("DROP_UDFS_AND_SPS")) and model.unique_id in selected_resources %}
{% if execute and (var("LQ_UPDATE_UDFS_AND_SPS") or var("DROP_UDFS_AND_SPS")) and model.unique_id in selected_resources %}
{% set sql %}
{{- crud_udfs(config, this.schema, var("DROP_UDFS_AND_SPS")) -}}
{%- endset -%}
Expand All @@ -174,7 +174,7 @@
{%- set blockchain = this.schema -%}
{%- set network = this.identifier -%}
{% set schema = blockchain ~ "_" ~ network %}
{% if execute and (var("UPDATE_UDFS_AND_SPS") or var("DROP_UDFS_AND_SPS")) and model.unique_id in selected_resources %}
{% if execute and (var("LQ_UPDATE_UDFS_AND_SPS") or var("DROP_UDFS_AND_SPS")) and model.unique_id in selected_resources %}
{% set sql %}
{% for config in configs %}
{{- crud_udfs_by_chain(config, blockchain, network, var("DROP_UDFS_AND_SPS")) -}}
Expand All @@ -197,7 +197,7 @@
#}
{%- set schema = this.schema -%}
{%- set utility_schema = this.identifier -%}
{% if execute and (var("UPDATE_UDFS_AND_SPS") or var("DROP_UDFS_AND_SPS")) and model.unique_id in selected_resources %}
{% if execute and (var("LQ_UPDATE_UDFS_AND_SPS") or var("DROP_UDFS_AND_SPS")) and model.unique_id in selected_resources %}
{% set sql %}
{% for config in configs %}
{{- crud_udfs_by_marketplace(config, schema, utility_schema, var("DROP_UDFS_AND_SPS")) -}}
Expand Down
37 changes: 37 additions & 0 deletions macros/near/near.yaml.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{% macro config_near_high_level_abstractions(blockchain, network) -%}
{#
This macro is used to generate the high level abstractions for the Near
blockchain.
#}
{% set schema = blockchain ~ "_" ~ network %}

- name: {{ schema -}}.udf_get_block_data
signature:
- [file_url, STRING, File URL created using BUILD_SCOPED_FILE_URL() snowflake internal function]
return_type:
- STRING
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'process_file'
COMMENT = $$A UDF to retrieve NEAR block data stored in files from the Near Lake Snowflake External Stage.$$
sql: |
{{ near_live_view_udf_get_block_data() | indent(4) -}}

- name: {{ schema -}}.tf_fact_blocks
signature:
- [block_id, INTEGER, The start block height to get the blocks from]
- [to_latest, BOOLEAN, Whether to continue fetching blocks until the latest block or not]
return_type:
- "TABLE(block_id NUMBER, block_timestamp TIMESTAMP_NTZ, block_hash STRING, tx_count STRING, block_author STRING, header VARIANT, block_challenges_result ARRAY, block_challenges_root STRING, chunk_headers_root STRING, chunk_tx_root STRING, chunk_mask VARIANT, chunk_receipts_root STRING, chunks VARIANT, chunks_included NUMBER, epoch_id STRING, epoch_sync_data_hash STRING, events VARIANT, gas_price NUMBER, last_ds_final_block STRING, last_final_block STRING, latest_protocol_version NUMBER, next_bp_hash STRING, next_epoch_id STRING, outcome_root STRING, prev_hash STRING, prev_height NUMBER, prev_state_root STRING, random_value STRING, rent_paid FLOAT, signature STRING, total_supply NUMBER, validator_proposals VARIANT, validator_reward NUMBER, fact_blocks_id STRING, inserted_timestamp TIMESTAMP_NTZ, modified_timestamp TIMESTAMP_NTZ)"
options: |
NOT NULL
RETURNS NULL ON NULL INPUT
VOLATILE
COMMENT = $$Returns the block data for a given block height. If to_latest is true, it will continue fetching blocks until the latest block. Otherwise, it will fetch blocks until the block_id height is reached.$$
sql: |
{{ near_live_view_fact_blocks(schema, blockchain, network) | indent(4) -}}

{%- endmacro -%}

178 changes: 178 additions & 0 deletions macros/near/near_live_views.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
-- Get Near Chain Head
{% macro near_live_view_latest_block_height() %}
{#
This macro retrieves the latest block height from the NEAR blockchain.

Args:
schema (str): The schema name.
blockchain (str): The blockchain name.
network (str): The network name.

Returns:
SQL query: A query that selects the latest block
#}
SELECT
live.udf_api(
'https://rpc.mainnet.near.org',
utils.udf_json_rpc_call(
'block',
{'finality': 'final'}
)
):data AS result,
result:result:header:height::integer as latest_block_height,
coalesce(
block_id,
latest_block_height
) as min_height,
iff(
coalesce(to_latest, false),
latest_block_height,
min_height
) as max_height
{% endmacro %}


{% macro near_live_view_udf_get_block_data() %}
{#
This macro generates a Python UDF for reading NEAR block data from Snowflake staged files mapped to the Near Lake.

Returns:
Python function that reads JSON block data:
- Input: file_url (str) - Scoped file URL to block.json
- Output: dict - Parsed block data

Dependencies:
- snowflake.snowpark.files
- Stage read permissions
#}
from snowflake.snowpark.files import SnowflakeFile

def process_file(file_url: str) -> dict:
"""
Process a single block data file from specified stage file URL.

Args:
file_url (str): The stage file URL created using BUILD_SCOPED_FILE_URL

Returns:
dict: The block data or error information

Note:
- File must contain valid NEAR blockchain block data in JSON format
- If file fails to process, an error object is returned
"""

with SnowflakeFile.open(file_url) as file:
return file.read()
{% endmacro %}

{% macro near_live_view_get_spine(table_name) %}
{#
This macro generates a spine table for block height ranges, it creates a sequence of block heights between `min_height` and `max_height`using
Snowflake's generator function.

Args:
table_name (str): Reference to a CTE or table that contains:
- block_id: Starting block height
- min_height: Minimum block height to generate
- max_height: Maximum block height to generate
- latest_block_height: Current chain head block height

#}
SELECT
block_height,
ROW_NUMBER() OVER (ORDER BY block_height) - 1 as partition_num
FROM
(
SELECT
row_number() over (order by seq4()) - 1 + COALESCE(block_height, 0)::integer as block_height,
min_height,
max_height
FROM
table(generator(ROWCOUNT => 1000)),
{{ table_name }}
QUALIFY block_height BETWEEN min_height AND max_height
)
{% endmacro %}


{% macro near_live_view_get_raw_block_data(spine_table, schema) %}
{#
This macro generates SQL that retrieves raw block data from the Near Lake data stored in Snowflake external stage.

It constructs URLs for block data files and uses a table function to fetch and parse the JSON data.

The macro performs two main operations:

1. Generates scoped file URLs for each block height using the Near Lake file naming convention
2. Calls the tf_get_block_data function to fetch and parse the block JSON data

Args:
spine_table (str): Reference to a CTE or table containing:
- block_height (INTEGER): The block heights to fetch data for

Returns:
str: A SQL query that returns a table with columns:
- block_height (INTEGER): The height of the block
- block_data (VARIANT): The parsed JSON data for the block

Note:
- Requires access to '@streamline.bronze.near_lake_data_mainnet' stage
- Block files must follow the format: XXXXXXXXXXXX/block.json where X is the zero-padded block height
- Uses the tf_get_block_data table function to parse JSON data
#}

WITH block_urls AS (
SELECT
partition_num,
BUILD_SCOPED_FILE_URL(
'@streamline.bronze.near_lake_data_mainnet',
CONCAT(LPAD(TO_VARCHAR(block_height), 12, '0'), '/block.json')
) as url
FROM {{spine_table}}
)
SELECT
partition_num,
url,
PARSE_JSON({{ schema -}}.udf_get_block_data_(url::STRING)) as block_data
FROM block_urls

{% endmacro %}

-- Get Near fact data
{% macro get_fact_blocks_transformations(schema) %}
{% set get_view_sql %}
SELECT GET_DDL('VIEW', '{{ schema }}.fact_blocks')
{% endset %}

{% set results = run_query(get_view_sql) %}
{% set view_ddl = results.rows[0][0] %}

-- Find the inner SELECT with all the transformations
{% set select_pos = view_ddl.find('SELECT\n block_id') %}
{% set from_pos = view_ddl.find('\nFROM\n blocks') %}

-- Extract just the transformations part
{% set transformations = view_ddl[select_pos + 6:from_pos].strip() %}

{% do log("Extracted transformations: " ~ transformations, info=True) %}

{{ return(transformations) }}
{% endmacro %}


{% macro near_live_view_fact_blocks(schema, blockchain, network) %}
WITH heights AS (
{{ near_live_view_latest_block_height() | indent(4) }}
),
spine AS (
{{ near_live_view_get_spine('heights') | indent(4) }}
),
raw_blocks AS (
{{ near_live_view_get_raw_block_data('spine', schema) | indent(4) }}
)
SELECT
{{ get_fact_blocks_transformations(schema) }}
FROM raw_blocks

{% endmacro %}
6 changes: 6 additions & 0 deletions models/deploy/near/near__mainnet.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- depends_on: {{ ref('near_models','core__fact_blocks') }}
{%- set configs = [
config_near_high_level_abstractions
] -%}

{{- ephemeral_deploy(configs) -}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- models/tests/test_fact_blocks_transformations.sql
{{ config(
materialized = 'view',
tags = ['near_models','core','override']
) }}

SELECT * FROM {{ ref('near_models','core__fact_blocks')}}
4 changes: 2 additions & 2 deletions models/doc_descriptions/integration/__overview__.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ Within the same directory (`models/deploy/marketplace/`), create a new folder fo

Deploy your model following the standard deployment procedures.

- `dbt run -s models/deploy/marketplace/your_model/your_model__.sql -t dev --vars '{"UPDATE_UDFS_AND_SPS":True}'`
- `dbt run -s models/deploy/marketplace/your_model/your_model__.sql -t dev --vars '{"LQ_UPDATE_UDFS_AND_SPS":True}'`

- `dbt test -s models/deploy/marketplace/your_model/your_model__.sql -t dev --vars '{"UPDATE_UDFS_AND_SPS":True}'`
- `dbt test -s models/deploy/marketplace/your_model/your_model__.sql -t dev --vars '{"LQ_UPDATE_UDFS_AND_SPS":True}'`

#### Additional Tips:

Expand Down
7 changes: 6 additions & 1 deletion models/sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ sources:
schema: core
tables:
- name: fact_event_logs
- name: ez_decoded_event_logs
- name: ez_decoded_event_logs
- name: near_models
database: "{{ 'near' if target.database == 'LIVEQUERY' else 'near_dev' }}"
schema: core
tables:
- name: fact_blocks
12 changes: 12 additions & 0 deletions package-lock.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
packages:
- package: calogica/dbt_expectations
version: 0.8.2
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/near-models-nlq.git
revision: f1edb8a0a851ca6e1b6297405122cc486f5c928b
- package: calogica/dbt_date
version: 0.7.2
- package: get-select/dbt_snowflake_query_tags
version: 2.5.0
sha1_hash: 146672df7bff15c1dc58878075c1b99e298ff914
Loading
Loading