diff --git a/.env.test b/.env.test index e50fa0d..591c023 100644 --- a/.env.test +++ b/.env.test @@ -2,4 +2,9 @@ CLICKHOUSE_URL CLICKHOUSE_PORT CLICKHOUSE_USER CLICKHOUSE_PASSWORD -CLICKHOUSE_SECURE \ No newline at end of file +CLICKHOUSE_SECURE + +POSTGRES_URL +POSTGRES_PORT +POSTGRES_USER +POSTGRES_PASSWORD \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 903f632..7f6be4d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,6 +27,9 @@ RUN pip install --user -r /app/requirements.txt # Copy dbt project COPY dbt_project.yml /app/dbt_project.yml +# Copy profiles.yml to the .dbt directory in the user's home +COPY profiles.yml /home/appuser/.dbt/profiles.yml + # Copy macros, models and seeds COPY /macros /app/macros COPY /models /app/models @@ -43,4 +46,4 @@ ENV DBT_PROJECT_PATH /app/src EXPOSE 8080 # Set PATH to include user-level binaries -ENV PATH=/home/appuser/.local/bin:$PATH +ENV PATH=/home/appuser/.local/bin:$PATH \ No newline at end of file diff --git a/README.md b/README.md index 6be551e..8716084 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,9 @@ This dbt project is designed to facilitate the transformation and analysis of Gn - `seeds/`: Seed files that contain static data used as inputs for models. +``` +docker exec -it dbt /bin/bash +``` ## Contributing diff --git a/dbt_project.yml b/dbt_project.yml index 8a0e044..69df343 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -2,7 +2,7 @@ name: 'gnosis_dbt' version: '1.0.0' config-version: 2 -profile: 'gnosis_dbt' # This should match the profile name in profiles.yml +profile: 'gnosis_dbt' model-paths: ["models"] analysis-paths: ["analyses"] diff --git a/docker-compose.yml b/docker-compose.yml index fc3ca5d..e766237 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,17 +18,19 @@ services: CLICKHOUSE_URL: ${CLICKHOUSE_URL} CLICKHOUSE_PORT: ${CLICKHOUSE_PORT} CLICKHOUSE_SECURE: ${CLICKHOUSE_SECURE} + POSTGRES_URL: ${POSTGRES_URL} + POSTGRES_PORT: ${POSTGRES_PORT} + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} volumes: - type: bind source: ./ target: /app consistency: cached - - ./profiles.yml:/home/appuser/.dbt/profiles.yml working_dir: /app user: appuser networks: - dbt_net - networks: - dbt_net: + dbt_net: \ No newline at end of file diff --git a/macros/consensus/_postgresql/create_bytea_to_bigint_function.sql b/macros/consensus/_postgresql/create_bytea_to_bigint_function.sql deleted file mode 100644 index e367c81..0000000 --- a/macros/consensus/_postgresql/create_bytea_to_bigint_function.sql +++ /dev/null @@ -1,30 +0,0 @@ --- models/_postgresql/create_bytea_to_bigint_function.sql - -{% macro create_bytea_to_numeric_function() %} - -CREATE OR REPLACE FUNCTION bytea_to_numeric(bytes bytea) RETURNS numeric AS $$ -DECLARE - result numeric := 0; - byte_value int; - byte_count int; -BEGIN - IF bytes IS NULL THEN - RETURN NULL; - END IF; - - byte_count := octet_length(bytes); - - IF byte_count = 0 THEN - RETURN 0; - END IF; - - FOR i IN 0..byte_count-1 LOOP - byte_value := get_byte(bytes, i); - result := result * 256 + byte_value; - END LOOP; - - RETURN result; -END; -$$ LANGUAGE plpgsql; - -{% endmacro %} \ No newline at end of file diff --git a/macros/consensus/_postgresql/create_deserialize_eth2_function.sql b/macros/consensus/_postgresql/create_deserialize_eth2_function.sql deleted file mode 100644 index 4b7f89e..0000000 --- a/macros/consensus/_postgresql/create_deserialize_eth2_function.sql +++ /dev/null @@ -1,19 +0,0 @@ --- models/_postgresql/create_deserialize_eth2_function.sql - -{% macro create_deserialize_eth2_function() %} - -CREATE OR REPLACE FUNCTION deserialize_eth2(eth2 bytea) -RETURNS TABLE ( - fork_digest text, - next_fork_version text, - next_fork_epoch numeric -) AS $$ -BEGIN - RETURN QUERY SELECT - encode(substring(eth2 from 1 for 4), 'hex') as fork_digest, - encode(substring(eth2 from 5 for 4), 'hex') as next_fork_version, - bytea_to_numeric(substring(eth2 from 9 for 8)) as next_fork_epoch; -END; -$$ LANGUAGE plpgsql; - -{% endmacro %} \ No newline at end of file diff --git a/macros/consensus/bytea_to_bigint.sql b/macros/consensus/bytea_to_bigint.sql deleted file mode 100644 index 78a7ccf..0000000 --- a/macros/consensus/bytea_to_bigint.sql +++ /dev/null @@ -1,12 +0,0 @@ -{% macro bytea_to_bigint(bytea_input) %} - cast( - bitwise_left_shift(cast(from_base(to_hex(substr({{ bytea_input }}, 1, 1)), 16) as bigint), 56) + - bitwise_left_shift(cast(from_base(to_hex(substr({{ bytea_input }}, 2, 1)), 16) as bigint), 48) + - bitwise_left_shift(cast(from_base(to_hex(substr({{ bytea_input }}, 3, 1)), 16) as bigint), 40) + - bitwise_left_shift(cast(from_base(to_hex(substr({{ bytea_input }}, 4, 1)), 16) as bigint), 32) + - bitwise_left_shift(cast(from_base(to_hex(substr({{ bytea_input }}, 5, 1)), 16) as bigint), 24) + - bitwise_left_shift(cast(from_base(to_hex(substr({{ bytea_input }}, 6, 1)), 16) as bigint), 16) + - bitwise_left_shift(cast(from_base(to_hex(substr({{ bytea_input }}, 7, 1)), 16) as bigint), 8) + - cast(from_base(to_hex(substr({{ bytea_input }}, 8, 1)), 16) as bigint) - as bigint) -{% endmacro %} diff --git a/macros/consensus/deserialize_eth2.sql b/macros/consensus/deserialize_eth2.sql deleted file mode 100644 index d983c86..0000000 --- a/macros/consensus/deserialize_eth2.sql +++ /dev/null @@ -1,30 +0,0 @@ -/* -FROM: https://notes.ethereum.org/@vbuterin/Sys3GLJbD - -def compute_fork_digest(current_version: Version, genesis_validators_root: Root) -> ForkDigest: - """ - Return the 4-byte fork digest for the ``current_version`` and ``genesis_validators_root``. - This is a digest primarily used for domain separation on the p2p layer. - 4-bytes suffices for practical separation of forks/chains. - """ - return ForkDigest(compute_fork_data_root(current_version, genesis_validators_root)[:4]) - - -fork_diges: - is compute_fork_digest(current_fork_version, genesis_validators_root) where current_fork_version - is the fork version at the node's current epoch defined by the wall-clock time - (not necessarily the epoch to which the node is sync) genesis_validators_root is the static Root - found in state.genesis_validators_root; -next_fork_version: - is the fork version corresponding to the next planned hard fork at a future epoch. - If no future fork is planned, set next_fork_version = current_fork_version to signal this fact; -next_fork_epoch: - is the epoch at which the next fork is planned and the current_fork_version will be updated. - If no future fork is planned, set next_fork_epoch = FAR_FUTURE_EPOCH to signal this fact; -*/ - -{% macro deserialize_eth2(eth2) %} - substr({{ eth2 }}, 1, 4) as fork_digest, - substr({{ eth2 }}, 5, 4) as next_fork_version, - {{ bytea_to_bigint('substr(' ~ eth2 ~ ', 9, 8)') }} as next_fork_epoch -{% endmacro %} \ No newline at end of file diff --git a/macros/consensus/node_count_over_time.sql b/macros/consensus/node_count_over_time.sql deleted file mode 100644 index 07f3f74..0000000 --- a/macros/consensus/node_count_over_time.sql +++ /dev/null @@ -1,70 +0,0 @@ -{% macro node_count_over_time(group_by_column, source_model, grouping_column_name='grouping_column', res = 'hour') %} - -{% set current_timestamp %} - SELECT DATE_TRUNC('hour', MAX(last_seen)) AS current_hour FROM {{ source_model }} -{% endset %} - - -WITH NodeStatus AS ( - SELECT - node_id, - last_seen, - last_seen_lead, - CASE - WHEN {{ group_by_column }} IS NULL THEN 'UNKNOWN' - ELSE {{ group_by_column }} - END AS {{ grouping_column_name }}, - status - FROM {{ source_model }} - WHERE last_seen < ({{ current_timestamp }}) -), - - -active AS ( - SELECT - datetime - ,{{ grouping_column_name }} - ,COUNT(*) AS cnt - FROM ( - SELECT DISTINCT - node_id - ,{{ grouping_column_name }} - ,GENERATE_SERIES(DATE_TRUNC('{{res}}', last_seen), DATE_TRUNC('{{res}}', last_seen_lead), '1 {{res}}'::INTERVAL) AS datetime - FROM NodeStatus - WHERE status = 'active' - ) t - GROUP BY 1, 2 -), - -calendar AS ( - SELECT - t1.datetime, - t2.{{ grouping_column_name }} - FROM ( - SELECT generate_series( - (SELECT MIN(datetime) FROM active), - ({{ current_timestamp }}), - '1 {{res}}'::interval - ) AS datetime - ) t1 - CROSS JOIN (SELECT DISTINCT {{ grouping_column_name }} FROM active) t2 -), - -final AS ( - SELECT - t1.datetime - ,t1.{{ grouping_column_name }} - ,t2.cnt - FROM calendar t1 - LEFT JOIN active t2 - ON t1.datetime = t2.datetime - AND t1.{{ grouping_column_name }} = t2.{{ grouping_column_name }} -) - -SELECT * FROM final -WHERE cnt IS NOT NULL -{% if is_incremental() %} -AND datetime > (SELECT MAX(datetime) FROM {{ this }}) -{% endif %} - -{% endmacro %} \ No newline at end of file diff --git a/macros/consensus/node_count_over_time_old.sql b/macros/consensus/node_count_over_time_old.sql deleted file mode 100644 index 20e83c8..0000000 --- a/macros/consensus/node_count_over_time_old.sql +++ /dev/null @@ -1,135 +0,0 @@ -{% macro node_count_over_time_old(group_by_column, source_model, grouping_column_name='grouping_column') %} - -{% set current_timestamp %} - SELECT DATE_TRUNC('hour', MAX(last_seen)) AS current_hour FROM {{ source_model }} -{% endset %} - - -WITH NodeStatus AS ( - SELECT - node_id, - last_seen, - last_seen_lead, - {{ group_by_column }} AS {{ grouping_column_name }}, - status - FROM {{ source_model }} - WHERE last_seen < ({{ current_timestamp }}) -), - -first_time AS ( - SELECT - node_id, - {{ group_by_column }} AS {{ grouping_column_name }}, - MIN(DATE_TRUNC('hour', last_seen)) AS datetime - FROM {{ source_model }} - WHERE - last_seen < ({{ current_timestamp }}) - GROUP BY node_id, {{ grouping_column_name }} -), - -allnodes AS ( - SELECT - datetime, - {{ grouping_column_name }}, - COUNT(*) AS cnt - FROM first_time - GROUP BY 1, 2 -), - - -hourly_data AS ( - SELECT - DATE_TRUNC('hour', last_seen) as datetime, - {{ grouping_column_name }}, - node_id - FROM NodeStatus - WHERE status = 'inactive' -), -ranked_data AS ( - SELECT - datetime, - {{ grouping_column_name }}, - node_id, - ROW_NUMBER() OVER (PARTITION BY {{ grouping_column_name }}, node_id ORDER BY datetime) as rn - FROM hourly_data -), -first_appearance AS ( - SELECT - datetime, - {{ grouping_column_name }}, - node_id - FROM ranked_data - WHERE rn = 1 -), - - - -inactive AS ( -SELECT - datetime, - {{ grouping_column_name }}, - cnt -FROM ( - SELECT - datetime, - {{ grouping_column_name }}, - COUNT(*) OVER (PARTITION BY {{ grouping_column_name }} ORDER BY datetime) as cnt, - ROW_NUMBER(*) OVER (PARTITION BY {{ grouping_column_name }} ORDER BY datetime) as rn -FROM first_appearance -) t -WHERE rn = 1 - -), - - -max_date AS ( - SELECT MAX(datetime) as max_datetime FROM allnodes -), - -calendar AS ( - SELECT - t1.datetime, - t2.{{ grouping_column_name }} - FROM ( - SELECT generate_series( - (SELECT MIN(datetime) FROM allnodes), - ({{ current_timestamp }}), - '1 hour'::interval - ) AS datetime - ) t1 - CROSS JOIN (SELECT DISTINCT {{ grouping_column_name }} FROM allnodes) t2 -), - -nodes_cumulative AS ( - SELECT - t1.datetime, - t1.{{ grouping_column_name }}, - SUM(COALESCE(t2.cnt,0)) OVER (PARTITION BY t1.{{ grouping_column_name }} ORDER BY t1.datetime) AS cnt - FROM calendar t1 - LEFT JOIN allnodes t2 - ON t2.datetime = t1.datetime - AND t2.{{ grouping_column_name }} = t1.{{ grouping_column_name }} -), - - -final AS ( - SELECT - t1.datetime, - t1.{{ grouping_column_name }}, - t2.cnt - COALESCE(t3.cnt,0) AS cnt - FROM calendar t1 - INNER JOIN nodes_cumulative t2 - ON t1.datetime = t2.datetime - AND t1.{{ grouping_column_name }} = t2.{{ grouping_column_name }} - LEFT JOIN inactive t3 - ON t3.datetime = t1.datetime - AND t3.{{ grouping_column_name }} = t1.{{ grouping_column_name }} -) - -SELECT * FROM final -WHERE cnt IS NOT NULL -{% if is_incremental() %} -AND datetime > (SELECT MAX(datetime) FROM {{ this }}) -{% endif %} - -{% endmacro %} \ No newline at end of file diff --git a/macros/db/clickhouse_utils.sql b/macros/db/clickhouse_utils.sql new file mode 100644 index 0000000..6de0d2a --- /dev/null +++ b/macros/db/clickhouse_utils.sql @@ -0,0 +1,17 @@ +{% macro generate_schema_name(custom_schema_name, node) -%} + {%- set default_schema = target.schema -%} + {%- if custom_schema_name is none -%} + {{ default_schema }} + {%- else -%} + {{ custom_schema_name | trim }} + {%- endif -%} +{%- endmacro %} + +{% macro generate_database_name(custom_database_name, node) -%} + {%- set default_database = target.schema -%} + {%- if custom_database_name is none -%} + {{ default_database }} + {%- else -%} + {{ custom_database_name | trim }} + {%- endif -%} +{%- endmacro %} \ No newline at end of file diff --git a/macros/db/postgres_queries.sql b/macros/db/postgres_queries.sql new file mode 100644 index 0000000..f192221 --- /dev/null +++ b/macros/db/postgres_queries.sql @@ -0,0 +1,9 @@ +{% macro get_postgres(pg_db,table_name) %} + postgresql( + '{{ env_var("POSTGRES_URL", "postgres") }}:{{ env_var("POSTGRES_PORT", "5432") }}', + '{{ pg_db }}', + '{{ table_name }}', + '{{ env_var("POSTGRES_USER") }}', + '{{ env_var("POSTGRES_PASSWORD") }}' + ) +{% endmacro %} diff --git a/macros/p2p/valtack_nodes_activity.sql b/macros/p2p/valtack_nodes_activity.sql new file mode 100644 index 0000000..ab20dbc --- /dev/null +++ b/macros/p2p/valtack_nodes_activity.sql @@ -0,0 +1,125 @@ +{% macro valtack_nodes_activity(resolution='day', aggregation_column=none) %} + +{% set valid_resolutions = ['hour', 'day', 'week', 'month'] %} + +{% if resolution not in valid_resolutions %} + {{ exceptions.raise_compiler_error("Invalid resolution. Choose from: " ~ valid_resolutions | join(', ')) }} +{% endif %} + +{% set valid_columns = ['city', 'country', 'asn_type', 'asn_organization'] %} + +{% if aggregation_column is not none and aggregation_column not in valid_columns %} + {{ exceptions.raise_compiler_error("Invalid aggregation column. Choose from: " ~ valid_columns | join(', ')) }} +{% endif %} + +{{ config( + materialized='incremental', + unique_key='date' if aggregation_column is none else ['date', aggregation_column], + incremental_strategy='delete+insert', + on_schema_change='sync_all_columns' +) }} + +WITH + +combined_events AS ( + SELECT + pe.enr, + pe.ip, + me.timestamp AS timestamp + FROM + {{ source('valtrack','peer_discovered_events') }} pe + INNER JOIN + {{ source('valtrack','metadata_received_events') }} me + ON pe.enr = me.enr + WHERE + me.timestamp < date_trunc('{{ resolution }}', now()) + {% if is_incremental() %} + AND me.timestamp >= (SELECT max(date) FROM {{ this }}) + {% endif %} +), + +{% if aggregation_column is not none %} +nodes_data AS ( + SELECT + ce.timestamp, + ce.enr, + ce.ip, + im.city, + im.country, + im.latitude, + im.longitude, + im.asn_organization, + im.asn_type + FROM + combined_events ce + INNER JOIN + {{ source('valtrack','ip_metadata') }} im + ON ce.ip = im.ip +), +{% endif %} + +date_series AS ( + SELECT arrayJoin(arrayMap( + d -> date_trunc('{{ resolution }}', toDateTime(min(timestamp))) + INTERVAL d {{ resolution }}, + range(toUInt32(dateDiff('{{ resolution }}', date_trunc('{{ resolution }}', toDateTime(min(timestamp))), date_trunc('{{ resolution }}', now())))) + )) AS date + FROM {% if aggregation_column is not none %}nodes_data{% else %}combined_events{% endif %} +), + +{% if aggregation_column is not none %} +distinct_values AS ( + SELECT DISTINCT {{ aggregation_column }} + FROM nodes_data +), + +date_value_combinations AS ( + SELECT + ds.date, + dv.{{ aggregation_column }} + FROM + date_series ds + CROSS JOIN + distinct_values dv +), +{% endif %} + +active_nodes AS ( + SELECT + date_trunc('{{ resolution }}', timestamp) AS date, + {% if aggregation_column is not none %} + {{ aggregation_column }}, + {% endif %} + countDistinct(enr) AS active_node_count + FROM {% if aggregation_column is not none %}nodes_data{% else %}combined_events{% endif %} + GROUP BY date{% if aggregation_column is not none %}, {{ aggregation_column }}{% endif %} +) + +SELECT + {% if aggregation_column is not none %} + dvc.date, + dvc.{{ aggregation_column }}, + {% else %} + ds.date, + {% endif %} + COALESCE(an.active_node_count, 0) AS active_nodes +FROM + {% if aggregation_column is not none %} + date_value_combinations dvc + LEFT JOIN + active_nodes an + ON dvc.date = an.date AND dvc.{{ aggregation_column }} = an.{{ aggregation_column }} + {% else %} + date_series ds + LEFT JOIN + active_nodes an + ON ds.date = an.date + {% endif %} +WHERE + {% if aggregation_column is not none %}dvc.date{% else %}ds.date{% endif %} < date_trunc('{{ resolution }}', now()) + {% if is_incremental() %} + AND {% if aggregation_column is not none %}dvc.date{% else %}ds.date{% endif %} > (SELECT max(date) FROM {{ this }}) + {% endif %} +ORDER BY + {% if aggregation_column is not none %}dvc.date, dvc.{{ aggregation_column }}{% else %}ds.date{% endif %} + +{% endmacro %} \ No newline at end of file diff --git a/models/transformations/power_consumption/gnosis_carbon_emissions.sql b/models/metrics/carbon_ratings/gnosis_carbon_emissions.sql similarity index 100% rename from models/transformations/power_consumption/gnosis_carbon_emissions.sql rename to models/metrics/carbon_ratings/gnosis_carbon_emissions.sql diff --git a/models/transformations/power_consumption/gnosis_power_consumption.sql b/models/metrics/carbon_ratings/gnosis_power_consumption.sql similarity index 78% rename from models/transformations/power_consumption/gnosis_power_consumption.sql rename to models/metrics/carbon_ratings/gnosis_power_consumption.sql index 4b15cc0..0ebebbc 100644 --- a/models/transformations/power_consumption/gnosis_power_consumption.sql +++ b/models/metrics/carbon_ratings/gnosis_power_consumption.sql @@ -77,17 +77,16 @@ best_guess_per_client AS ( ), configuration_distribution AS ( - SELECT * FROM ( - VALUES - ('Erigon', 'Lighthouse', 0.340), - ('Erigon', 'Teku', 0.114), - ('Erigon', 'Lodestar', 0.044), - ('Erigon', 'Nimbus', 0.002), - ('Nethermind', 'Lighthouse', 0.340), - ('Nethermind', 'Teku', 0.114), - ('Nethermind', 'Lodestar', 0.044), - ('Nethermind', 'Nimbus', 0.002) - ) AS t(execution_client, consensus_client, frac) + SELECT + execution_client + ,consensus_client + ,frac + FROM ( + SELECT + arrayJoin(['Erigon', 'Erigon', 'Erigon', 'Erigon', 'Nethermind', 'Nethermind', 'Nethermind', 'Nethermind']) AS execution_client, + arrayJoin(['Lighthouse', 'Teku', 'Lodestar', 'Nimbus', 'Lighthouse', 'Teku', 'Lodestar', 'Nimbus']) AS consensus_client, + arrayJoin([0.340, 0.114, 0.044, 0.002, 0.340, 0.114, 0.044, 0.002]) AS frac + ) ), power_best_guess AS ( diff --git a/models/metrics/p2p/gnosis_p2p_nodes_geo_last_day.sql b/models/metrics/p2p/gnosis_p2p_nodes_geo_last_day.sql new file mode 100644 index 0000000..3b1a414 --- /dev/null +++ b/models/metrics/p2p/gnosis_p2p_nodes_geo_last_day.sql @@ -0,0 +1,26 @@ +{{ config( + materialized='table', + unique_key='enr', +) }} + +SELECT DISTINCT + t1.enr + ,t2.city + ,t2.country + ,t2.latitude + ,t2.longitude + ,t2.asn_organization + ,t2.asn_type +FROM + {{ source('valtrack','metadata_received_events') }} t1 +INNER JOIN + {{ source('valtrack','peer_discovered_events') }} t3 + ON + t3.enr = t1.enr +INNER JOIN + {{ source('valtrack','ip_metadata') }} t2 + ON t2.ip = t3.ip +WHERE + t1.timestamp >= date_trunc('day', now() - INTERVAL 5 DAY) + AND + t1.timestamp < date_trunc('day', now()- INTERVAL 4 DAY) \ No newline at end of file diff --git a/models/sources/p2p/p2p_sources.yml b/models/sources/p2p/p2p_sources.yml index 9bcb346..e2fcbe5 100644 --- a/models/sources/p2p/p2p_sources.yml +++ b/models/sources/p2p/p2p_sources.yml @@ -1,11 +1,12 @@ version: 2 sources: - - name: gnosis_xatu - database: gnosis_xatu - schema: public + - name: valtrack + schema: valtrack tables: - - name: node_record + - name: metadata_received_events + - name: peer_discovered_events + - name: ip_metadata - name: gnosis_chaind database: gnosis_chaind @@ -14,4 +15,3 @@ sources: - name: t_proposer_duties - name: t_blocks - name: t_attestations - diff --git a/models/transformations/attestations/gnosis_consensus_attestations.sql b/models/transformations/attestations/gnosis_consensus_attestations.sql index c16826f..887d3e9 100644 --- a/models/transformations/attestations/gnosis_consensus_attestations.sql +++ b/models/transformations/attestations/gnosis_consensus_attestations.sql @@ -1,52 +1,59 @@ --- models/gnosis/gnosis_enr_ranked_records.sql {{ config( - materialized='table', - engine='AggregatingMergeTree()', - partition_by='toYYYYMMDD(f_inclusion_slot)', - order_by='(f_inclusion_slot, inc_dist_cohort)', - primary_key='(f_inclusion_slot, inc_dist_cohort)' - ) + materialized='table', + engine='MergeTree()', + order_by='(f_inclusion_slot, inc_dist_cohort)', + primary_key='(f_inclusion_slot, inc_dist_cohort)' + ) }} - WITH -total_solts AS ( +total_slots AS ( SELECT f_slot FROM - {{ source('gnosis_chaind', 't_proposer_duties') }} + {{ get_postgres('gnosis_chaind', 't_proposer_duties') }} + LIMIT 100 ), -proposed_solts AS ( +proposed_slots AS ( SELECT f_slot FROM - {{ source('gnosis_chaind', 't_blocks') }} + {{ get_postgres('gnosis_chaind', 't_blocks') }} + LIMIT 100 +), + +attestations AS ( + SELECT + f_inclusion_slot, + f_slot, + f_inclusion_index + FROM + {{ get_postgres('gnosis_chaind', 't_attestations') }} + LIMIT 100 ), inclusion_distance AS ( - SELECT - a.f_inclusion_slot - ,a.f_slot - ,a.f_inclusion_index - ,COUNT(p.f_slot) AS inc_dist_cohort - FROM - {{ source('gnosis_chaind', 't_attestations') }} a - LEFT JOIN - proposed_solts p - ON - p.f_slot>a.f_slot AND p.f_slot<=a.f_inclusion_slot - GROUP BY 1, 2, 3 + SELECT + a.f_inclusion_slot, + a.f_slot, + a.f_inclusion_index, + COUNT(DISTINCT p.f_slot) AS inc_dist_cohort + FROM + attestations a + CROSS JOIN + proposed_slots p + WHERE + p.f_slot > a.f_slot AND p.f_slot <= a.f_inclusion_slot + GROUP BY 1, 2, 3 ) SELECT - f_inclusion_slot - --DATE_TRUNC('hour', compute_timestamp_at_slot(f_inclusion_slot)) AS timestamp - ,inc_dist_cohort - ,COUNT(*) AS cnt + f_inclusion_slot, + inc_dist_cohort, + COUNT(*) AS cnt FROM - inclusion_distance -GROUP BY 1, 2 - + inclusion_distance +GROUP BY 1, 2 \ No newline at end of file diff --git a/models/transformations/p2p/gnosis_enr_ranked_records.sql b/models/transformations/p2p/gnosis_enr_ranked_records.sql deleted file mode 100644 index 1f3f74e..0000000 --- a/models/transformations/p2p/gnosis_enr_ranked_records.sql +++ /dev/null @@ -1,70 +0,0 @@ --- models/gnosis/gnosis_enr_ranked_records.sql - -{{ - config( - materialized='table' - ) -}} - -WITH - -gnosis_enr AS ( - SELECT - * - FROM ( - SELECT - enr, - encode(fork_digest, 'hex') As fork_digest, - encode(next_fork_version, 'hex') As next_fork_version, - next_fork_epoch - FROM - {{ source('gnosis_xatu', 'node_record') }} t1, - LATERAL deserialize_eth2(t1.eth2) - ) t - WHERE - next_fork_version IN ('01000064','02000064','03000064','04000064','05000064') -), - -RankedRecords AS ( - SELECT - t1.*, - ROW_NUMBER() OVER (PARTITION BY t1.ip4 ORDER BY t1.create_time DESC) AS rn, - ROW_NUMBER() OVER (PARTITION BY t1.node_id ORDER BY t1.create_time DESC) AS rn2 - FROM - {{ source('gnosis_xatu', 'node_record') }} t1 - INNER JOIN - gnosis_enr t2 ON t2.enr = t1.enr -) -SELECT - create_time - last_dial_time - consecutive_dial_attempts - last_connect_time - encode(signature, 'hex') AS signature - geo_longitude - geo_latitude - geo_autonomous_system_number - secp256k1 - ip4 - ip6 - tcp4 - udp4 - tcp6 - udp6 - eth2 - attnets - syncnets - seq - geo_autonomous_system_organization - id - node_id - peer_id - geo_city - geo_country - geo_country_code - geo_continent_code - enr -FROM - RankedRecords -WHERE - rn = 1 \ No newline at end of file diff --git a/models/transformations/p2p/gnosis_enr_ranked_records_ch.sql b/models/transformations/p2p/gnosis_enr_ranked_records_ch.sql deleted file mode 100644 index 3716480..0000000 --- a/models/transformations/p2p/gnosis_enr_ranked_records_ch.sql +++ /dev/null @@ -1,14 +0,0 @@ --- models/gnosis/gnosis_enr_ranked_records.sql -{{ - config( - materialized='table' - ) -}} - - SELECT - t1.enr - ,t1.peer_id - ,t1.geo_city - FROM - {{ source('gnosis_xatu', 'node_record') }} AS t1 - diff --git a/models/transformations/p2p/gnosis_enr_ranked_records_trino.sql b/models/transformations/p2p/gnosis_enr_ranked_records_trino.sql deleted file mode 100644 index 58d9b8a..0000000 --- a/models/transformations/p2p/gnosis_enr_ranked_records_trino.sql +++ /dev/null @@ -1,8 +0,0 @@ --- models/gnosis/gnosis_enr_ranked_records.sql -{{ config(materialized='table') }} - - SELECT - t1.* - FROM - {{ source('gnosis_xatu', 'node_record') }} AS t1 - diff --git a/models/transformations/p2p/gnosis_p2p_network.yml b/models/transformations/p2p/gnosis_p2p_network.yml index d87cff9..254e9fe 100644 --- a/models/transformations/p2p/gnosis_p2p_network.yml +++ b/models/transformations/p2p/gnosis_p2p_network.yml @@ -32,7 +32,37 @@ models: name: status description: "Active/Inactive node" + - name: gnosis_p2p_nodes_count + meta: + blockchain: gnosis + sector: p2p + contributors: hdser + config: + tags: ['p2p', 'gnosis', 'hdser'] + description: > + Block resolution xDAI balances and balances diff for each address on gnosis. + Depends on transfers_gnosis_xdai + columns: + - &date + name: date + description: "Last time node was seen in the network" + - &active_nodes + name: active_nodes + description: "Lead Last time node was seen in the network" + + - name: gnosis_p2p_nodes + meta: + blockchain: gnosis + sector: p2p + contributors: hdser + config: + tags: ['p2p', 'gnosis', 'hdser'] + description: > + Block resolution xDAI balances and balances diff for each address on gnosis. + Depends on transfers_gnosis_xdai + + - name: test meta: blockchain: gnosis sector: p2p diff --git a/models/transformations/p2p/gnosis_p2p_nodes_cnt.sql b/models/transformations/p2p/gnosis_p2p_nodes_cnt.sql deleted file mode 100644 index 013908d..0000000 --- a/models/transformations/p2p/gnosis_p2p_nodes_cnt.sql +++ /dev/null @@ -1,61 +0,0 @@ -{{ config(materialized='incremental', unique_key='datetime') }} - -WITH - - -NodeStatus AS ( - SELECT - node_id - ,last_seen - ,last_seen_lead - ,status - FROM - {{ ref('gnosis_p2p_nodes_status') }} -), - - -first_time AS ( - SELECT - node_id - ,MIN(DATE_TRUNC('hour', last_seen)) as datetime - FROM - NodeStatus - GROUP BY - 1 -), - -allnodes AS ( - - SELECT - datetime - ,COUNT(*) AS cnt - FROM - first_time - GROUP BY 1 - -), - -inactive AS ( - SELECT - DATE_TRUNC('hour', last_seen_lead) as datetime - ,COUNT(DISTINCT node_id) AS cnt - FROM - NodeStatus - WHERE - status = 'inactive' - GROUP BY - 1 -) - - -SELECT - t1.datetime - ,(SUM(t1.cnt) OVER (ORDER BY t1.datetime)) - COALESCE(t2.cnt,0) AS cnt -FROM - allnodes t1 -LEFT JOIN - inactive t2 - ON - t2.datetime = t1.datetime -ORDER BY - 1 ASC \ No newline at end of file diff --git a/models/transformations/p2p/gnosis_p2p_nodes_status.sql b/models/transformations/p2p/gnosis_p2p_nodes_status.sql index cccdc94..03803d0 100644 --- a/models/transformations/p2p/gnosis_p2p_nodes_status.sql +++ b/models/transformations/p2p/gnosis_p2p_nodes_status.sql @@ -1,117 +1,53 @@ {{ config(materialized='incremental', unique_key='last_seen') }} {% set activity_buffer = '72 hours' %} -{% set chain_code = '00064' %} WITH gnosis_nodes AS ( - SELECT - enr - ,node_id - ,create_time AS last_seen - ,COALESCE(LEAD(create_time) OVER (PARTITION BY enr ORDER BY create_time), CURRENT_TIMESTAMP) AS last_seen_lead - ,geo_longitude - ,geo_latitude - ,geo_autonomous_system_organization - ,geo_city - ,geo_country - ,geo_country_code - ,geo_continent_code - ,fork_digest - ,next_fork_version - ,CASE - WHEN next_fork_version = CONCAT('010','{{ chain_code }}') THEN 'ALTAIR' - WHEN next_fork_version = CONCAT('020','{{ chain_code }}') THEN 'BELLATRIX' - WHEN next_fork_version = CONCAT('030','{{ chain_code }}') THEN 'CAPELLA' - WHEN next_fork_version = CONCAT('040','{{ chain_code }}') THEN 'DENEB' - WHEN next_fork_version = CONCAT('050','{{ chain_code }}') THEN 'ELECTRA' - END next_fork_label - ,next_fork_epoch - ,CASE - WHEN LOWER(geo_autonomous_system_organization) ~* 'amazon' THEN 'Amazon' - WHEN LOWER(geo_autonomous_system_organization) ~* 'google' THEN 'Google' - WHEN LOWER(geo_autonomous_system_organization) ~* 'microsoft' THEN 'Microsoft' - WHEN LOWER(geo_autonomous_system_organization) ~* 'oracle' THEN 'Oracle' - WHEN LOWER(geo_autonomous_system_organization) ~* 'alibaba' THEN 'Alibaba' - WHEN LOWER(geo_autonomous_system_organization) ~* 'huawei clouds' THEN 'Huawei' - WHEN LOWER(geo_autonomous_system_organization) ~* 'hetzner' THEN 'Hetzner' - WHEN LOWER(geo_autonomous_system_organization) ~* 'contabo' THEN 'Contabo' - WHEN LOWER(geo_autonomous_system_organization) ~* 'ovh sas' THEN 'OVH SAS' - WHEN LOWER(geo_autonomous_system_organization) ~* 'netcup' THEN 'Netcup' - WHEN LOWER(geo_autonomous_system_organization) ~* 'limestone' THEN 'Limestone' - WHEN LOWER(geo_autonomous_system_organization) ~* 'allnodes' THEN 'Allnodes' - WHEN LOWER(geo_autonomous_system_organization) ~* 'teraswitch' THEN 'Teraswitch' - WHEN LOWER(geo_autonomous_system_organization) ~* 'latitude-sh' THEN 'Latitude-sh' - WHEN LOWER(geo_autonomous_system_organization) ~* 'datacamp' THEN 'Datacamp' - ELSE 'Self Hosted' - END AS provider - FROM ( - SELECT - t1.*, - t2.fork_digest, - t2.next_fork_version, - t2.next_fork_epoch - FROM - {{ source('gnosis_xatu','node_record') }} t1, - LATERAL deserialize_eth2(t1.eth2) t2 - ) t - WHERE - next_fork_version LIKE CONCAT('%','{{ chain_code }}') - + SELECT + enr + ,timestamp + ,IF( + timestamp = max(timestamp) OVER (PARTITION BY enr), + now(), + any(timestamp) OVER ( + PARTITION BY enr + ORDER BY timestamp ASC + ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING + ) + ) AS timestamp_lead + FROM ( + SELECT DISTINCT + timestamp + ,enr + FROM + {{ source('valtrack','metadata_received_events') }} + ) ), -NodeStatus AS ( +nodes_active AS ( SELECT * - ,LEAST(last_seen + INTERVAL '{{ activity_buffer }}', last_seen_lead) AS active_until + ,LEAST(timestamp + INTERVAL '{{ activity_buffer }}', timestamp_lead) AS active_until FROM gnosis_nodes ), -SplitStatus AS ( - SELECT - enr, - node_id, - last_seen, - active_until AS last_seen_lead, - 'active' AS status, - geo_longitude, - geo_latitude, - geo_autonomous_system_organization, - geo_city, - geo_country, - geo_country_code, - geo_continent_code, - fork_digest, - next_fork_version, - next_fork_label, - next_fork_epoch, - provider - FROM NodeStatus +nodes_status AS ( + SELECT + * + ,'active' AS status + FROM nodes_active + WHERE active_until = timestamp_lead UNION ALL SELECT - enr, - node_id, - active_until AS last_seen, - last_seen_lead, - 'inactive' AS status, - geo_longitude, - geo_latitude, - geo_autonomous_system_organization, - geo_city, - geo_country, - geo_country_code, - geo_continent_code, - fork_digest, - next_fork_version, - next_fork_label, - next_fork_epoch, - provider - FROM NodeStatus - WHERE active_until < last_seen_lead + * + ,'inactive' AS status + FROM nodes_active + WHERE active_until < timestamp_lead ) -SELECT * FROM SplitStatus \ No newline at end of file +SELECT * FROM nodes_status \ No newline at end of file diff --git a/models/transformations/p2p/p2p_valtrack_nodes_cnt_1h.sql b/models/transformations/p2p/p2p_valtrack_nodes_cnt_1h.sql new file mode 100644 index 0000000..5d5f9a3 --- /dev/null +++ b/models/transformations/p2p/p2p_valtrack_nodes_cnt_1h.sql @@ -0,0 +1 @@ +{{ valtack_nodes_activity(resolution='hour') }} \ No newline at end of file diff --git a/models/transformations/p2p/p2p_valtrack_nodes_cnt_country_1d.sql b/models/transformations/p2p/p2p_valtrack_nodes_cnt_country_1d.sql new file mode 100644 index 0000000..d7e4a7d --- /dev/null +++ b/models/transformations/p2p/p2p_valtrack_nodes_cnt_country_1d.sql @@ -0,0 +1 @@ +{{ valtack_nodes_activity(resolution='day', aggregation_column='country') }} \ No newline at end of file diff --git a/models/transformations/p2p/test.sql b/models/transformations/p2p/test.sql index 77b45ca..f64c896 100644 --- a/models/transformations/p2p/test.sql +++ b/models/transformations/p2p/test.sql @@ -1,68 +1,14 @@ --- models/gnosis/gnosis_enr_ranked_records.sql - -{{ - config( - materialized='table' - ) -}} - -WITH gnosis_enr AS ( - SELECT - t1.enr, - to_hex(t2.fork_digest) AS fork_digest, - to_hex(t2.next_fork_version) AS next_fork_version, - t2.next_fork_epoch - FROM - {{ source('gnosis_xatu', 'node_record') }} t1, - LATERAL (SELECT {{ deserialize_eth2('t1.eth2') }}) AS t2 - WHERE - to_hex(t2.next_fork_version) IN ('01000064', '02000064', '03000064', '04000064', '05000064') -), - -RankedRecords AS ( - SELECT - t1.* - --ROW_NUMBER() OVER (PARTITION BY t1.ip4 ORDER BY t1.create_time DESC) AS rn, - --ROW_NUMBER() OVER (PARTITION BY t1.node_id ORDER BY t1.create_time DESC) AS rn2 - FROM - {{ source('gnosis_xatu', 'node_record') }} t1 - INNER JOIN - gnosis_enr t2 ON t2.enr = t1.enr +{{ config( + materialized='table', + engine='MergeTree()', + order_by='f_slot' +) }} + +WITH postgres_data AS ( + SELECT * + FROM {{ get_postgres('gnosis_chaind','t_blocks') }} + LIMIT 100 ) -SELECT - CAST(create_time AS timestamp) AS create_time - ,CAST(last_dial_time AS timestamp) AS last_dial_time - ,consecutive_dial_attempts - ,CAST(last_connect_time AS timestamp) AS last_connect_time - ,to_hex(signature) AS signature - ,geo_longitude - ,geo_latitude - ,geo_autonomous_system_number - ,secp256k1 - --,ip4 - --,ip6 - ,tcp4 - ,udp4 - ,tcp6 - ,udp6 - ,eth2 - ,attnets - ,syncnets - ,seq - ,geo_autonomous_system_organization - ,id - ,node_id - ,peer_id - ,geo_city - ,geo_country - ,geo_country_code - ,geo_continent_code - ,enr -FROM - RankedRecords ---WHERE --- rn = 1 - -- Additional conditions can be uncommented as needed - -- AND rn2 = 1 - -- AND ip4 = '46.162.82.129' +SELECT * +FROM postgres_data diff --git a/models/transformations/power_consumption/execution_power.sql b/models/transformations/power_consumption/execution_power.sql index ebf4dd7..6e958d5 100644 --- a/models/transformations/power_consumption/execution_power.sql +++ b/models/transformations/power_consumption/execution_power.sql @@ -1,13 +1,15 @@ WITH execution_power AS ( - SELECT * FROM ( - VALUES - (4, 'Erigon', 18.6), - (5, 'Erigon', 17.59), - (6, 'Erigon', 44.62), - (4, 'Nethermind', 18.6), - (5, 'Nethermind', 17.59), - (6, 'Nethermind', 44.62) - ) AS t(type, client, mean) + SELECT + type, + client, + mean + FROM ( + SELECT + arrayJoin([4, 5, 6, 4, 5, 6]) AS type, + arrayJoin(['Erigon', 'Erigon', 'Erigon', 'Nethermind', 'Nethermind', 'Nethermind']) AS client, + arrayJoin([18.6, 17.59, 44.62, 18.6, 17.59, 44.62]) AS mean + + ) ) SELECT * FROM execution_power \ No newline at end of file diff --git a/models/transformations/power_consumption/hardware_config.sql b/models/transformations/power_consumption/hardware_config.sql index 86f7b06..94acac0 100644 --- a/models/transformations/power_consumption/hardware_config.sql +++ b/models/transformations/power_consumption/hardware_config.sql @@ -1,10 +1,28 @@ WITH hardware_config AS ( - SELECT * FROM ( - VALUES - (4, 'Intel i5-1135G7', '4/8', 'x86/x64', '16 GB', '2 TB SSD', 'Onboard', '65 Watt', 'Integrated', 'Ubuntu 20.04'), - (5, 'Intel i5-10400', '6/12', 'x86/x64', '64 GB', '2TB SSD', 'Onboard', '650 Watt', 'Custom', 'Ubuntu 21'), - (6, 'AMD 3970X', '32/64', 'x86/x64', '256 GB', '2TB SSD', 'AM 6970', '1000 Watt', 'Custom', 'Ubuntu 20.04') - ) AS t(type, cpu, cores_threads, architecture, ram, storage, gpu, psu, "case", os) + SELECT + type + ,cpu + ,cores_threads + ,architecture + ,ram + ,storage + ,gpu + ,psu + ,"case" + ,os + FROM ( + SELECT + arrayJoin([4, 5, 6]) AS type, + arrayJoin(['Intel i5-1135G7', 'Intel i5-10400', 'AMD 3970X']) AS cpu, + arrayJoin(['4/8', '6/12', '32/64']) AS cores_threads, + arrayJoin(['x86/x64', 'x86/x64', 'x86/x64']) AS architecture, + arrayJoin(['16 GB', '64 GB', '256 GB']) AS ram, + arrayJoin(['2 TB SSD', '2TB SSD', '2TB SSD']) AS storage, + arrayJoin(['Onboard', 'Onboard', 'AM 6970']) AS gpu, + arrayJoin(['65 Watt', '650 Watt', '1000 Watt']) AS psu, + arrayJoin(['Integrated', 'Custom', 'Custom']) AS "case", + arrayJoin(['Ubuntu 20.04', 'Ubuntu 21', 'Ubuntu 20.04']) AS os + ) ) SELECT * FROM hardware_config \ No newline at end of file diff --git a/models/transformations/power_consumption/idle_electric_power.sql b/models/transformations/power_consumption/idle_electric_power.sql index e7b2fc6..be776a9 100644 --- a/models/transformations/power_consumption/idle_electric_power.sql +++ b/models/transformations/power_consumption/idle_electric_power.sql @@ -1,10 +1,12 @@ WITH idle_electric_power AS ( - SELECT * FROM ( - VALUES - (4, 3.66), - (5, 25.04), - (6, 78.17) - ) AS t(type, mean) + SELECT + type + ,mean + FROM ( + SELECT + arrayJoin([4, 5, 6]) AS type, + arrayJoin([3.66, 25.04, 78.17]) AS mean + ) ) SELECT * FROM idle_electric_power \ No newline at end of file diff --git a/models/transformations/power_consumption/node_distribution.sql b/models/transformations/power_consumption/node_distribution.sql index 9fcf8ff..06fb254 100644 --- a/models/transformations/power_consumption/node_distribution.sql +++ b/models/transformations/power_consumption/node_distribution.sql @@ -1,10 +1,12 @@ WITH node_distribution AS ( - SELECT * FROM ( - VALUES - (4, 0.25), - (5, 0.50), - (6, 0.25) - ) AS t(type, distribution) + SELECT + type + ,distribution + FROM ( + SELECT + arrayJoin([4, 5, 6]) AS type, + arrayJoin([0.25, 0.50, 0.25]) AS distribution + ) ) SELECT * FROM node_distribution \ No newline at end of file diff --git a/profiles.yml b/profiles.yml index 06ef745..a0a13f9 100644 --- a/profiles.yml +++ b/profiles.yml @@ -1,7 +1,7 @@ gnosis_dbt: - target: ch-dbt + target: ch_dbt outputs: - ch-dbt: + ch_dbt: type: clickhouse schema: "dbt" verify: False @@ -11,12 +11,22 @@ gnosis_dbt: user: "{{ env_var('CLICKHOUSE_USER') }}" password: "{{ env_var('CLICKHOUSE_PASSWORD') }}" threads: 40 - connect_timeout: 60 # Increase the connect timeout (in seconds) - read_timeout: 3000 # Increase the read timeout (in seconds) + connect_timeout: 60 + read_timeout: 3000 - ch-valtrack: + gnosis_chaind: + type: postgres + host: "{{ env_var('POSTGRES_HOST', 'postgres') }}" + user: "{{ env_var('POSTGRES_USER') }}" + password: "{{ env_var('POSTGRES_PASSWORD') }}" + port: "{{ env_var('POSTGRES_PORT', '5432') | int }}" + dbname: "gnosis_chaind" + schema: "public" + threads: 80 + + ch_valtrack: type: clickhouse - schema: "valtrack_preview" + schema: "valtrack" verify: False host: "{{ env_var('CLICKHOUSE_URL') }}" port: "{{ env_var('CLICKHOUSE_PORT', '8123') | int }}" @@ -26,10 +36,10 @@ gnosis_dbt: threads: 40 connect_timeout: 60 read_timeout: 3000 - - ch-goteth: + + ch_goteth: type: clickhouse - schema: "goteth_preview" + schema: "goteth" verify: False host: "{{ env_var('CLICKHOUSE_URL') }}" port: "{{ env_var('CLICKHOUSE_PORT', '8123') | int }}" diff --git a/requirements.txt b/requirements.txt index 041490a..4d2fcd1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ dbt-core==1.8.7 dbt-clickhouse==1.8.4 +dbt-postgres==1.8.2 pytest==8.2.1 \ No newline at end of file