Skip to content

Commit

Permalink
Add an example for creating an automation to mark zombie flow runs as…
Browse files Browse the repository at this point in the history
… crashed (#16425)
  • Loading branch information
desertaxle authored Dec 18, 2024
1 parent 068ea22 commit 8287333
Showing 1 changed file with 64 additions and 0 deletions.
64 changes: 64 additions & 0 deletions docs/v3/automate/events/automations-triggers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,70 @@ In the above example:
- `user_id_1` creates and then completes an order, triggering a run of our deployment.
- `user_id_2` creates an order, but no completed event is emitted so no deployment is triggered.

### Detect and respond to zombie flows

If the infrastructure a flow is running on suddenly fails (for example, the machine crashes or a container is evicted),
Prefect's orchestration engine will be unable to report state changes and the flow run will get stuck in the running state.

Fortunately, flow runs triggered via deployment can emit heartbeats as they are running, and a Prefect automation can update
a flow run's state to crashed if the server stops receiving heartbeats for that flow run.

<Note>
**Enable flow run heartbeat events**

You will need to ensure you're running Prefect version 3.1.8 or greater and set `PREFECT_RUNNER_HEARTBEAT_FREQUENCY`
to an integer greater than 30 to emit flow run heartbeat events.
</Note>

To create an automation that marks zombie flow runs as crashed, run this script:
```python
from datetime import timedelta
from prefect.automations import Automation
from prefect.client.schemas.objects import StateType
from prefect.events.actions import ChangeFlowRunState
from prefect.events.schemas.automations import EventTrigger, Posture
from prefect.events.schemas.events import ResourceSpecification
my_automation = Automation(
name="Crash zombie flows",
trigger=EventTrigger(
after={"prefect.flow-run.heartbeat"},
expect={
"prefect.flow-run.heartbeat",
"prefect.flow-run.Completed",
"prefect.flow-run.Failed",
"prefect.flow-run.Cancelled",
"prefect.flow-run.Crashed",
},
match=ResourceSpecification({"prefect.resource.id": ["prefect.flow-run.*"]}),
for_each={"prefect.resource.id"},
posture=Posture.Proactive,
threshold=1,
within=timedelta(seconds=90),
),
actions=[
ChangeFlowRunState(
state=StateType.CRASHED,
message="Flow run marked as crashed due to missing heartbeats.",
)
],
)
if __name__ == "__main__":
my_automation.create()
```

The trigger definition says after each heartbeat event for a flow run we expect to see another heartbeat event or a
terminal state event for that same flow run within 90 seconds of a heartbeat event.

If `PREFECT_RUNNER_HEARTBEAT_FREQUENCY` is set to `30`, the automation will trigger only after 3 heartbeats have been missed.
You can adjust `within` in the trigger definition and `PREFECT_RUNNER_HEARTBEAT_FREQUENCY` to change how quickly the automation
will fire after the server stops receiving flow run heartbeats.

You can also add additional actions to your automation to send a notification when zombie runs are detected.

## See also

- To learn more about Prefect events, which can trigger automations, see the [events docs](/v3/automate/events/events/).
Expand Down

0 comments on commit 8287333

Please sign in to comment.