From 8287333eb50f2a8391931a0a2abea6215eedf9d2 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Wed, 18 Dec 2024 06:55:07 -0800 Subject: [PATCH] Add an example for creating an automation to mark zombie flow runs as crashed (#16425) --- .../automate/events/automations-triggers.mdx | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/docs/v3/automate/events/automations-triggers.mdx b/docs/v3/automate/events/automations-triggers.mdx index 56a916a2cba2..4a24c84389e9 100644 --- a/docs/v3/automate/events/automations-triggers.mdx +++ b/docs/v3/automate/events/automations-triggers.mdx @@ -905,6 +905,70 @@ In the above example: - `user_id_1` creates and then completes an order, triggering a run of our deployment. - `user_id_2` creates an order, but no completed event is emitted so no deployment is triggered. +### Detect and respond to zombie flows + +If the infrastructure a flow is running on suddenly fails (for example, the machine crashes or a container is evicted), +Prefect's orchestration engine will be unable to report state changes and the flow run will get stuck in the running state. + +Fortunately, flow runs triggered via deployment can emit heartbeats as they are running, and a Prefect automation can update +a flow run's state to crashed if the server stops receiving heartbeats for that flow run. + + +**Enable flow run heartbeat events** + +You will need to ensure you're running Prefect version 3.1.8 or greater and set `PREFECT_RUNNER_HEARTBEAT_FREQUENCY` +to an integer greater than 30 to emit flow run heartbeat events. + + +To create an automation that marks zombie flow runs as crashed, run this script: +```python +from datetime import timedelta + +from prefect.automations import Automation +from prefect.client.schemas.objects import StateType +from prefect.events.actions import ChangeFlowRunState +from prefect.events.schemas.automations import EventTrigger, Posture +from prefect.events.schemas.events import ResourceSpecification + + +my_automation = Automation( + name="Crash zombie flows", + trigger=EventTrigger( + after={"prefect.flow-run.heartbeat"}, + expect={ + "prefect.flow-run.heartbeat", + "prefect.flow-run.Completed", + "prefect.flow-run.Failed", + "prefect.flow-run.Cancelled", + "prefect.flow-run.Crashed", + }, + match=ResourceSpecification({"prefect.resource.id": ["prefect.flow-run.*"]}), + for_each={"prefect.resource.id"}, + posture=Posture.Proactive, + threshold=1, + within=timedelta(seconds=90), + ), + actions=[ + ChangeFlowRunState( + state=StateType.CRASHED, + message="Flow run marked as crashed due to missing heartbeats.", + ) + ], +) + +if __name__ == "__main__": + my_automation.create() +``` + +The trigger definition says after each heartbeat event for a flow run we expect to see another heartbeat event or a +terminal state event for that same flow run within 90 seconds of a heartbeat event. + +If `PREFECT_RUNNER_HEARTBEAT_FREQUENCY` is set to `30`, the automation will trigger only after 3 heartbeats have been missed. +You can adjust `within` in the trigger definition and `PREFECT_RUNNER_HEARTBEAT_FREQUENCY` to change how quickly the automation +will fire after the server stops receiving flow run heartbeats. + +You can also add additional actions to your automation to send a notification when zombie runs are detected. + ## See also - To learn more about Prefect events, which can trigger automations, see the [events docs](/v3/automate/events/events/).