Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: graphql add Run.hasRunMetricsEnabled field #24662

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
RunRecord,
RunsFilter,
)
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, TagType, get_tag_type
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, RUN_METRIC_TAGS, TagType, get_tag_type
from dagster._core.workspace.permissions import Permissions
from dagster._utils.tags import get_boolean_tag_value
from dagster._utils.yaml_utils import dump_run_config_yaml

from dagster_graphql.implementation.events import from_event_record, iterate_metadata_entries
Expand Down Expand Up @@ -82,6 +83,11 @@
from dagster_graphql.schema.asset_graph import GrapheneAssetNode
from dagster_graphql.schema.partition_sets import GrapheneJobSelectionPartition

UNSTARTED_STATUSES = [
DagsterRunStatus.QUEUED,
DagsterRunStatus.NOT_STARTED,
DagsterRunStatus.STARTING,
]

STARTED_STATUSES = {
DagsterRunStatus.STARTED,
Expand Down Expand Up @@ -384,6 +390,7 @@ class GrapheneRun(graphene.ObjectType):
hasConcurrencyKeySlots = graphene.NonNull(graphene.Boolean)
rootConcurrencyKeys = graphene.List(graphene.NonNull(graphene.String))
hasUnconstrainedRootNodes = graphene.NonNull(graphene.Boolean)
hasRunMetricsEnabled = graphene.NonNull(graphene.Boolean)

class Meta:
interfaces = (GraphenePipelineRun, GrapheneRunsFeedEntry)
Expand Down Expand Up @@ -624,6 +631,13 @@ def resolve_rootConcurrencyKeys(self, graphene_info: ResolveInfo):
root_concurrency_keys.extend([concurrency_key] * count)
return root_concurrency_keys

def resolve_hasRunMetricsEnabled(self, graphene_info: ResolveInfo):
if self.dagster_run.status in UNSTARTED_STATUSES:
return False
mlarose marked this conversation as resolved.
Show resolved Hide resolved

run_tags = self.dagster_run.tags
return any(get_boolean_tag_value(run_tags.get(tag)) for tag in RUN_METRIC_TAGS)


class GrapheneIPipelineSnapshotMixin:
# Mixin this class to implement IPipelineSnapshot
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import tempfile

import pytest
import yaml
from dagster import AssetMaterialization, Output, define_asset_job, job, op, repository
from dagster._core.definitions.decorators.asset_decorator import asset
Expand Down Expand Up @@ -292,6 +293,22 @@
}
"""

RUN_METRICS_QUERY = """
query RunQuery($runId: ID!) {
pipelineRunOrError(runId: $runId) {
__typename
... on Run {
runId
tags {
key
value
}
hasRunMetricsEnabled
}
}
}
"""


def _get_runs_data(result, run_id):
for run_data in result.data["pipelineOrError"]["runs"]:
Expand Down Expand Up @@ -981,3 +998,47 @@ def test_run_has_concurrency_slots():
assert result.data["pipelineRunsOrError"]["results"][0]["runId"] == run_id
assert result.data["pipelineRunsOrError"]["results"][0]["hasConcurrencyKeySlots"]
assert result.data["pipelineRunsOrError"]["results"][0]["rootConcurrencyKeys"]


@pytest.mark.parametrize(
"run_tag, run_tag_value, run_metrics_enabled, failure_message",
[
(None, None, False, "run_metrics tag not present should result to false"),
(".dagster/run_metrics", "true", True, "run_metrics tag set to true should result to true"),
(
".dagster/run_metrics",
"false",
False,
"run_metrics tag set to falsy value should result to false",
),
(
"dagster/run_metrics",
"true",
True,
"public run_metrics tag set to true should result to true",
),
],
)
def test_run_has_run_metrics_enabled(run_tag, run_tag_value, run_metrics_enabled, failure_message):
with instance_for_test() as instance:
repo = get_repo_at_time_1()
tags = (
{
run_tag: run_tag_value,
}
if run_tag
else {}
)
run = instance.create_run_for_job(
repo.get_job("foo_job"), status=DagsterRunStatus.STARTED, tags=tags
)

with define_out_of_process_context(__file__, "get_repo_at_time_1", instance) as context:
result = execute_dagster_graphql(
context,
RUN_METRICS_QUERY,
variables={"runId": run.run_id},
)
assert result.data
has_run_metrics_enabled = result.data["pipelineRunOrError"]["hasRunMetricsEnabled"]
assert has_run_metrics_enabled == run_metrics_enabled, failure_message
6 changes: 6 additions & 0 deletions python_modules/dagster/dagster/_core/storage/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@
RETRY_ON_ASSET_OR_OP_FAILURE_TAG,
]

# Supports for the public tag is deprecated
RUN_METRIC_TAGS = [
f"{HIDDEN_TAG_PREFIX}run_metrics",
f"{SYSTEM_TAG_PREFIX}run_metrics",
]


class TagType(Enum):
# Custom tag provided by a user
Expand Down