Skip to content

Commit

Permalink
[graphql] consolidate instigator has & get calls on to workspace cont…
Browse files Browse the repository at this point in the history
…ext (#25445)

refactor to use selectors and do point lookups against the workspace
context, allowing different context implementations more flexibility

## How I Tested These Changes

existing coverage
  • Loading branch information
alangenfeld authored Oct 25, 2024
1 parent 6dc16aa commit de37b8d
Show file tree
Hide file tree
Showing 15 changed files with 120 additions and 124 deletions.

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,11 @@
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRun

from dagster_graphql.implementation.external import (
get_full_remote_job_or_raise,
get_remote_job_or_raise,
)
from dagster_graphql.implementation.external import get_full_remote_job_or_raise
from dagster_graphql.implementation.utils import JobSubsetSelector, UserFacingGraphQLError
from dagster_graphql.schema.util import ResolveInfo

if TYPE_CHECKING:
from dagster_graphql.schema.pipelines.pipeline import GraphenePipeline
from dagster_graphql.schema.pipelines.pipeline_ref import GrapheneUnknownPipeline
from dagster_graphql.schema.pipelines.snapshot import GraphenePipelineSnapshot

Expand Down Expand Up @@ -68,18 +64,6 @@ def _get_job_snapshot_from_instance(
return GraphenePipelineSnapshot(historical_pipeline)


def get_job_or_error(graphene_info: ResolveInfo, selector: JobSubsetSelector) -> "GraphenePipeline":
"""Returns a PipelineOrError."""
return get_job_from_selector(graphene_info, selector)


def get_job_or_raise(graphene_info: ResolveInfo, selector: JobSubsetSelector) -> "GraphenePipeline":
"""Returns a Pipeline or raises a UserFacingGraphQLError if one cannot be retrieved
from the selector, e.g., the pipeline is not present in the loaded repository.
"""
return get_job_from_selector(graphene_info, selector)


def get_job_reference_or_raise(
graphene_info: ResolveInfo, dagster_run: DagsterRun
) -> Union["GraphenePipelineSnapshot", "GrapheneUnknownPipeline"]:
Expand All @@ -100,13 +84,3 @@ def get_job_reference_or_raise(
return _get_job_snapshot_from_instance(
graphene_info.context.instance, dagster_run.job_snapshot_id
)


def get_job_from_selector(
graphene_info: ResolveInfo, selector: JobSubsetSelector
) -> "GraphenePipeline":
from dagster_graphql.schema.pipelines.pipeline import GraphenePipeline

check.inst_param(selector, "selector", JobSubsetSelector)

return GraphenePipeline(get_remote_job_or_raise(graphene_info, selector))
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@
def start_schedule(
graphene_info: ResolveInfo, schedule_selector: ScheduleSelector
) -> "GrapheneScheduleStateResult":
from dagster_graphql.schema.errors import GrapheneScheduleNotFoundError
from dagster_graphql.schema.instigation import GrapheneInstigationState
from dagster_graphql.schema.schedules import GrapheneScheduleStateResult

check.inst_param(schedule_selector, "schedule_selector", ScheduleSelector)
location = graphene_info.context.get_code_location(schedule_selector.location_name)
repository = location.get_repository(schedule_selector.repository_name)
schedule = graphene_info.context.get_schedule(schedule_selector)
if not schedule:
raise UserFacingGraphQLError(
GrapheneScheduleNotFoundError(schedule_name=schedule_selector.schedule_name)
)

schedule = repository.get_schedule(schedule_selector.schedule_name)
stored_state = graphene_info.context.instance.start_schedule(schedule)
schedule_state = schedule.get_current_instigator_state(stored_state)

Expand Down Expand Up @@ -145,7 +148,6 @@ def get_schedules_or_error(
results = [
GrapheneSchedule(
schedule,
repository.handle,
schedule_states_by_name.get(schedule.name),
batch_loader,
)
Expand Down Expand Up @@ -176,7 +178,7 @@ def get_schedules_for_pipeline(
schedule.selector_id,
)

results.append(GrapheneSchedule(schedule, repository.handle, schedule_state))
results.append(GrapheneSchedule(schedule, schedule_state))

return results

Expand All @@ -188,20 +190,17 @@ def get_schedule_or_error(
from dagster_graphql.schema.schedules import GrapheneSchedule

check.inst_param(schedule_selector, "schedule_selector", ScheduleSelector)
location = graphene_info.context.get_code_location(schedule_selector.location_name)
repository = location.get_repository(schedule_selector.repository_name)

if not repository.has_schedule(schedule_selector.schedule_name):
schedule = graphene_info.context.get_schedule(schedule_selector)
if not schedule:
raise UserFacingGraphQLError(
GrapheneScheduleNotFoundError(schedule_name=schedule_selector.schedule_name)
)

schedule = repository.get_schedule(schedule_selector.schedule_name)

schedule_state = graphene_info.context.instance.get_instigator_state(
schedule.get_remote_origin_id(), schedule.selector_id
)
return GrapheneSchedule(schedule, repository.handle, schedule_state)
return GrapheneSchedule(schedule, schedule_state)


def get_schedule_next_tick(
Expand All @@ -213,22 +212,16 @@ def get_schedule_next_tick(
return None

repository_origin = schedule_state.origin.repository_origin
if not graphene_info.context.has_code_location(
repository_origin.code_location_origin.location_name
):
return None
code_location = graphene_info.context.get_code_location(
repository_origin.code_location_origin.location_name
selector = ScheduleSelector(
location_name=repository_origin.code_location_origin.location_name,
repository_name=repository_origin.repository_name,
schedule_name=schedule_state.name,
)
if not code_location.has_repository(repository_origin.repository_name):
return None

repository = code_location.get_repository(repository_origin.repository_name)

if not repository.has_schedule(schedule_state.name):
schedule = graphene_info.context.get_schedule(selector)
if not schedule:
return None

schedule = repository.get_schedule(schedule_state.name)
time_iter = schedule.execution_time_iterator(time.time())

next_timestamp = next(time_iter).timestamp()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def get_sensors_or_error(
results=[
GrapheneSensor(
sensor,
repository.handle,
sensor_states_by_name.get(sensor.name),
batch_loader,
)
Expand All @@ -77,18 +76,17 @@ def get_sensor_or_error(graphene_info: ResolveInfo, selector: SensorSelector) ->
from dagster_graphql.schema.sensors import GrapheneSensor

check.inst_param(selector, "selector", SensorSelector)
location = graphene_info.context.get_code_location(selector.location_name)
repository = location.get_repository(selector.repository_name)

if not repository.has_sensor(selector.sensor_name):
sensor = graphene_info.context.get_sensor(selector)
if not sensor:
raise UserFacingGraphQLError(GrapheneSensorNotFoundError(selector.sensor_name))
sensor = repository.get_sensor(selector.sensor_name)

sensor_state = graphene_info.context.instance.get_instigator_state(
sensor.get_remote_origin_id(),
sensor.selector_id,
)

return GrapheneSensor(sensor, repository.handle, sensor_state)
return GrapheneSensor(sensor, sensor_state)


def start_sensor(graphene_info: ResolveInfo, sensor_selector: SensorSelector) -> "GrapheneSensor":
Expand All @@ -97,13 +95,12 @@ def start_sensor(graphene_info: ResolveInfo, sensor_selector: SensorSelector) ->

check.inst_param(sensor_selector, "sensor_selector", SensorSelector)

location = graphene_info.context.get_code_location(sensor_selector.location_name)
repository = location.get_repository(sensor_selector.repository_name)
if not repository.has_sensor(sensor_selector.sensor_name):
sensor = graphene_info.context.get_sensor(sensor_selector)
if not sensor:
raise UserFacingGraphQLError(GrapheneSensorNotFoundError(sensor_selector.sensor_name))
sensor = repository.get_sensor(sensor_selector.sensor_name)

sensor_state = graphene_info.context.instance.start_sensor(sensor)
return GrapheneSensor(sensor, repository.handle, sensor_state)
return GrapheneSensor(sensor, sensor_state)


def stop_sensor(
Expand Down Expand Up @@ -147,16 +144,13 @@ def reset_sensor(graphene_info: ResolveInfo, sensor_selector: SensorSelector) ->
from dagster_graphql.schema.sensors import GrapheneSensor

check.inst_param(sensor_selector, "sensor_selector", SensorSelector)

location = graphene_info.context.get_code_location(sensor_selector.location_name)
repository = location.get_repository(sensor_selector.repository_name)
if not repository.has_sensor(sensor_selector.sensor_name):
sensor = graphene_info.context.get_sensor(sensor_selector)
if not sensor:
raise UserFacingGraphQLError(GrapheneSensorNotFoundError(sensor_selector.sensor_name))

sensor = repository.get_sensor(sensor_selector.sensor_name)
sensor_state = graphene_info.context.instance.reset_sensor(sensor)

return GrapheneSensor(sensor, repository.handle, sensor_state)
return GrapheneSensor(sensor, sensor_state)


def get_sensors_for_pipeline(
Expand All @@ -179,7 +173,7 @@ def get_sensors_for_pipeline(
sensor.get_remote_origin_id(),
sensor.selector_id,
)
results.append(GrapheneSensor(sensor, repository.handle, sensor_state))
results.append(GrapheneSensor(sensor, sensor_state))

return results

Expand All @@ -190,27 +184,18 @@ def get_sensor_next_tick(
from dagster_graphql.schema.instigation import GrapheneDryRunInstigationTick

check.inst_param(sensor_state, "sensor_state", InstigatorState)

repository_origin = sensor_state.origin.repository_origin
if not graphene_info.context.has_code_location(
repository_origin.code_location_origin.location_name
):
if not sensor_state.is_running:
return None

code_location = graphene_info.context.get_code_location(
repository_origin.code_location_origin.location_name
repository_origin = sensor_state.origin.repository_origin
selector = SensorSelector(
location_name=repository_origin.code_location_origin.location_name,
repository_name=repository_origin.repository_name,
sensor_name=sensor_state.name,
)
if not code_location.has_repository(repository_origin.repository_name):
return None

repository = code_location.get_repository(repository_origin.repository_name)

if not repository.has_sensor(sensor_state.name):
return None

sensor = repository.get_sensor(sensor_state.name)

if not sensor_state.is_running:
sensor = graphene_info.context.get_sensor(selector)
if not sensor:
return None

ticks = graphene_info.context.instance.get_ticks(
Expand All @@ -235,13 +220,11 @@ def set_sensor_cursor(
from dagster_graphql.schema.errors import GrapheneSensorNotFoundError
from dagster_graphql.schema.sensors import GrapheneSensor

location = graphene_info.context.get_code_location(selector.location_name)
repository = location.get_repository(selector.repository_name)

if not repository.has_sensor(selector.sensor_name):
sensor = graphene_info.context.get_sensor(selector)
if not sensor:
raise UserFacingGraphQLError(GrapheneSensorNotFoundError(selector.sensor_name))

instance = graphene_info.context.instance
sensor = repository.get_sensor(selector.sensor_name)
stored_state = instance.get_instigator_state(
sensor.get_remote_origin_id(),
sensor.selector_id,
Expand All @@ -264,4 +247,4 @@ def set_sensor_cursor(
else:
instance.update_instigator_state(updated_state)

return GrapheneSensor(sensor, repository.handle, updated_state)
return GrapheneSensor(sensor, updated_state)
Original file line number Diff line number Diff line change
Expand Up @@ -861,13 +861,16 @@ def resolve_targetingInstigators(self, graphene_info: ResolveInfo) -> Sequence[G
if isinstance(self._remote_node, RemoteWorkspaceAssetNode):
# global nodes have saved references to their targeting instigators
schedules = [
graphene_info.context.get_schedule(schedule_handle)
schedule
for schedule_handle in self._remote_node.get_targeting_schedule_handles()
if (schedule := graphene_info.context.get_schedule(schedule_handle)) is not None
]
sensors = [
graphene_info.context.get_sensor(sensor_handle)
sensor
for sensor_handle in self._remote_node.get_targeting_sensor_handles()
if (sensor := graphene_info.context.get_sensor(sensor_handle)) is not None
]

else:
# fallback to using the repository
repo = graphene_info.context.get_repository(self._repository_selector)
Expand All @@ -883,7 +886,6 @@ def resolve_targetingInstigators(self, graphene_info: ResolveInfo) -> Sequence[G
results.append(
GrapheneSensor(
sensor,
sensor.handle.repository_handle,
sensor_state,
)
)
Expand All @@ -896,7 +898,6 @@ def resolve_targetingInstigators(self, graphene_info: ResolveInfo) -> Sequence[G
results.append(
GrapheneSchedule(
schedule,
schedule.handle.repository_handle,
schedule_state,
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,6 @@ def resolve_schedules(self, graphene_info: ResolveInfo):
[
GrapheneSchedule(
schedule,
repository.handle,
batch_loader.get_schedule_state(schedule.name),
batch_loader,
)
Expand All @@ -326,7 +325,6 @@ def resolve_sensors(self, graphene_info: ResolveInfo, sensorType: Optional[Senso
return [
GrapheneSensor(
sensor,
repository.handle,
batch_loader.get_sensor_state(sensor.name),
batch_loader,
)
Expand Down
Loading

1 comment on commit de37b8d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-j488qw0lg-elementl.vercel.app

Built with commit de37b8d.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.