Skip to content

Commit

Permalink
its all soup
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Dec 8, 2023
1 parent 9a1ea2e commit 709de96
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
25 changes: 13 additions & 12 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,8 @@ def consume_events(self) -> Iterator[DagsterEvent]:
def has_events(self) -> bool:
return bool(self._events)

def log_event(self, event: UserEvent, step_execution_context: StepExecutionContext) -> None:
if isinstance(event, AssetMaterialization):
self._events.append(DagsterEvent.asset_materialization(step_execution_context, event))
elif isinstance(event, AssetObservation):
self._events.append(DagsterEvent.asset_observation(step_execution_context, event))
elif isinstance(event, ExpectationResult):
self._events.append(DagsterEvent.step_expectation_result(step_execution_context, event))
else:
check.failed(f"Unexpected event {event}")
def log_event(self, event: DagsterEvent):
self._events.append(event)

@property
def requires_typed_event_stream(self) -> bool:
Expand Down Expand Up @@ -549,9 +542,17 @@ def log_event(self, event: UserEvent) -> None:
def log_materialization(context):
context.log_event(AssetMaterialization("foo"))
"""
self.execution_properties.log_event(
event=event, step_execution_context=self._step_execution_context
)
if isinstance(event, AssetMaterialization):
dagster_event = DagsterEvent.asset_materialization(self._step_execution_context, event)
elif isinstance(event, AssetObservation):
dagster_event = DagsterEvent.asset_observation(self._step_execution_context, event)
elif isinstance(event, ExpectationResult):
dagster_event = DagsterEvent.step_expectation_result(
self._step_execution_context, event
)
else:
check.failed(f"Unexpected event {event}")
self.execution_properties.log_event(event=dagster_event)

@public
def add_output_metadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ def invoke_compute_fn(
config_arg_cls: Optional[Type[Config]],
resource_args: Optional[Dict[str, str]] = None,
) -> Any:
# TODO - this is a possible execution pathway for both direct invocation and normal execution. Need to figure
# out the implications for the context
args_to_pass = {**kwargs}
if config_arg_cls:
# config_arg_cls is either a Config class or a primitive type
Expand Down

0 comments on commit 709de96

Please sign in to comment.