From 16514f6c5be7cced38be9ae6d51dba2f270eddb3 Mon Sep 17 00:00:00 2001 From: Mathieu Larose Date: Mon, 23 Sep 2024 12:14:54 -0400 Subject: [PATCH] feat: graphql add Run.hasRunMetricsEnabled field --- .../ui-core/src/graphql/schema.graphql | 1 + .../packages/ui-core/src/graphql/types.ts | 5 ++ .../schema/pipelines/pipeline.py | 16 ++++- .../graphql/test_runs.py | 61 +++++++++++++++++++ .../dagster/dagster/_core/storage/tags.py | 6 ++ 5 files changed, 88 insertions(+), 1 deletion(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index 6c2398666fb77..1eea13085353f 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -1440,6 +1440,7 @@ type Run implements PipelineRun & RunsFeedEntry { hasConcurrencyKeySlots: Boolean! rootConcurrencyKeys: [String!] hasUnconstrainedRootNodes: Boolean! + hasRunMetricsEnabled: Boolean! } interface RunsFeedEntry { diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index fed74766cb610..234f635c8fddc 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -4435,6 +4435,7 @@ export type Run = PipelineRun & hasConcurrencyKeySlots: Scalars['Boolean']['output']; hasDeletePermission: Scalars['Boolean']['output']; hasReExecutePermission: Scalars['Boolean']['output']; + hasRunMetricsEnabled: Scalars['Boolean']['output']; hasTerminatePermission: Scalars['Boolean']['output']; hasUnconstrainedRootNodes: Scalars['Boolean']['output']; id: Scalars['ID']['output']; @@ -12981,6 +12982,10 @@ export const buildRun = ( overrides && overrides.hasOwnProperty('hasReExecutePermission') ? overrides.hasReExecutePermission! : true, + hasRunMetricsEnabled: + overrides && overrides.hasOwnProperty('hasRunMetricsEnabled') + ? overrides.hasRunMetricsEnabled! + : false, hasTerminatePermission: overrides && overrides.hasOwnProperty('hasTerminatePermission') ? overrides.hasTerminatePermission! diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py index f10e020aacae2..342b9a120652e 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py @@ -19,8 +19,9 @@ RunRecord, RunsFilter, ) -from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_tag_type +from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, RUN_METRIC_TAGS, TagType, get_tag_type from dagster._core.workspace.permissions import Permissions +from dagster._utils.tags import get_boolean_tag_value from dagster._utils.yaml_utils import dump_run_config_yaml from dagster_graphql.implementation.events import from_event_record, iterate_metadata_entries @@ -82,6 +83,11 @@ from dagster_graphql.schema.asset_graph import GrapheneAssetNode from dagster_graphql.schema.partition_sets import GrapheneJobSelectionPartition +UNSTARTED_STATUSES = [ + DagsterRunStatus.QUEUED, + DagsterRunStatus.NOT_STARTED, + DagsterRunStatus.STARTING, +] STARTED_STATUSES = { DagsterRunStatus.STARTED, @@ -384,6 +390,7 @@ class GrapheneRun(graphene.ObjectType): hasConcurrencyKeySlots = graphene.NonNull(graphene.Boolean) rootConcurrencyKeys = graphene.List(graphene.NonNull(graphene.String)) hasUnconstrainedRootNodes = graphene.NonNull(graphene.Boolean) + hasRunMetricsEnabled = graphene.NonNull(graphene.Boolean) class Meta: interfaces = (GraphenePipelineRun, GrapheneRunsFeedEntry) @@ -624,6 +631,13 @@ def resolve_rootConcurrencyKeys(self, graphene_info: ResolveInfo): root_concurrency_keys.extend([concurrency_key] * count) return root_concurrency_keys + def resolve_hasRunMetricsEnabled(self, graphene_info: ResolveInfo): + if self.dagster_run.status in UNSTARTED_STATUSES: + return False + + run_tags = self.dagster_run.tags + return any(get_boolean_tag_value(run_tags.get(tag)) for tag in RUN_METRIC_TAGS) + class GrapheneIPipelineSnapshotMixin: # Mixin this class to implement IPipelineSnapshot diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs.py index 5527a8de1f501..17c6095dba23d 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs.py @@ -1,6 +1,7 @@ import copy import tempfile +import pytest import yaml from dagster import AssetMaterialization, Output, define_asset_job, job, op, repository from dagster._core.definitions.decorators.asset_decorator import asset @@ -292,6 +293,22 @@ } """ +RUN_METRICS_QUERY = """ +query RunQuery($runId: ID!) { + pipelineRunOrError(runId: $runId) { + __typename + ... on Run { + runId + tags { + key + value + } + hasRunMetricsEnabled + } + } +} +""" + def _get_runs_data(result, run_id): for run_data in result.data["pipelineOrError"]["runs"]: @@ -981,3 +998,47 @@ def test_run_has_concurrency_slots(): assert result.data["pipelineRunsOrError"]["results"][0]["runId"] == run_id assert result.data["pipelineRunsOrError"]["results"][0]["hasConcurrencyKeySlots"] assert result.data["pipelineRunsOrError"]["results"][0]["rootConcurrencyKeys"] + + +@pytest.mark.parametrize( + "run_tag, run_tag_value, run_metrics_enabled, failure_message", + [ + (None, None, False, "run_metrics tag not present should result to false"), + (".dagster/run_metrics", "true", True, "run_metrics tag set to true should result to true"), + ( + ".dagster/run_metrics", + "false", + False, + "run_metrics tag set to falsy value should result to false", + ), + ( + "dagster/run_metrics", + "true", + True, + "public run_metrics tag set to true should result to true", + ), + ], +) +def test_run_has_run_metrics_enabled(run_tag, run_tag_value, run_metrics_enabled, failure_message): + with instance_for_test() as instance: + repo = get_repo_at_time_1() + tags = ( + { + run_tag: run_tag_value, + } + if run_tag + else {} + ) + run = instance.create_run_for_job( + repo.get_job("foo_job"), status=DagsterRunStatus.STARTED, tags=tags + ) + + with define_out_of_process_context(__file__, "get_repo_at_time_1", instance) as context: + result = execute_dagster_graphql( + context, + RUN_METRICS_QUERY, + variables={"runId": run.run_id}, + ) + assert result.data + has_run_metrics_enabled = result.data["pipelineRunOrError"]["hasRunMetricsEnabled"] + assert has_run_metrics_enabled == run_metrics_enabled, failure_message diff --git a/python_modules/dagster/dagster/_core/storage/tags.py b/python_modules/dagster/dagster/_core/storage/tags.py index b7c7923efe7d5..7dc7031479b67 100644 --- a/python_modules/dagster/dagster/_core/storage/tags.py +++ b/python_modules/dagster/dagster/_core/storage/tags.py @@ -90,6 +90,12 @@ RETRY_ON_ASSET_OR_OP_FAILURE_TAG, ] +# Supports for the public tag is deprecated +RUN_METRIC_TAGS = [ + f"{HIDDEN_TAG_PREFIX}run_metrics", + f"{SYSTEM_TAG_PREFIX}run_metrics", +] + class TagType(Enum): # Custom tag provided by a user