Skip to content

Commit

Permalink
refactor shared argo annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
saikonen committed Nov 27, 2024
1 parent ddf9a3f commit 6ecf354
Showing 1 changed file with 55 additions and 50 deletions.
105 changes: 55 additions & 50 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,54 +352,6 @@ def _shared_kubernetes_annotations(self):
"metaflow/project_flow_name": current.project_flow_name,
}
)

if self._schedule is not None:
# timezone is an optional field and json dumps on None will result in null
# hence configuring it to an empty string
if self._timezone is None:
self._timezone = ""
cron_info = {"schedule": self._schedule, "tz": self._timezone}
annotations.update({"metaflow/cron": json.dumps(cron_info)})

if self.parameters:
annotations.update({"metaflow/parameters": json.dumps(self.parameters)})

# Some more annotations to populate the Argo UI nicely
if self.tags:
annotations.update({"metaflow/tags": json.dumps(self.tags)})
if self.triggers:
annotations.update(
{
"metaflow/triggers": json.dumps(
[
{key: trigger.get(key) for key in ["name", "type"]}
for trigger in self.triggers
]
)
}
)
if self.notify_on_error:
annotations.update(
{
"metaflow/notify_on_error": json.dumps(
{
"slack": bool(self.notify_slack_webhook_url),
"pager_duty": bool(self.notify_pager_duty_integration_key),
}
)
}
)
if self.notify_on_success:
annotations.update(
{
"metaflow/notify_on_success": json.dumps(
{
"slack": bool(self.notify_slack_webhook_url),
"pager_duty": bool(self.notify_pager_duty_integration_key),
}
)
}
)
return annotations

def _get_schedule(self):
Expand Down Expand Up @@ -706,6 +658,55 @@ def _compile_workflow_template(self):
# generate container templates at the top level (in WorkflowSpec) and maintain
# references to them within the DAGTask.

annotations = {}
if self._schedule is not None:
# timezone is an optional field and json dumps on None will result in null
# hence configuring it to an empty string
if self._timezone is None:
self._timezone = ""
cron_info = {"schedule": self._schedule, "tz": self._timezone}
annotations.update({"metaflow/cron": json.dumps(cron_info)})

if self.parameters:
annotations.update({"metaflow/parameters": json.dumps(self.parameters)})

# Some more annotations to populate the Argo UI nicely
if self.tags:
annotations.update({"metaflow/tags": json.dumps(self.tags)})
if self.triggers:
annotations.update(
{
"metaflow/triggers": json.dumps(
[
{key: trigger.get(key) for key in ["name", "type"]}
for trigger in self.triggers
]
)
}
)
if self.notify_on_error:
annotations.update(
{
"metaflow/notify_on_error": json.dumps(
{
"slack": bool(self.notify_slack_webhook_url),
"pager_duty": bool(self.notify_pager_duty_integration_key),
}
)
}
)
if self.notify_on_success:
annotations.update(
{
"metaflow/notify_on_success": json.dumps(
{
"slack": bool(self.notify_slack_webhook_url),
"pager_duty": bool(self.notify_pager_duty_integration_key),
}
)
}
)

return (
WorkflowTemplate()
.metadata(
Expand All @@ -716,8 +717,9 @@ def _compile_workflow_template(self):
# is released, we should be able to support multi-namespace /
# multi-cluster scheduling.
.namespace(KUBERNETES_NAMESPACE)
.label("app.kubernetes.io/name", "metaflow-flow")
.annotations(annotations)
.annotations(self.shared_kubernetes_annotations)
.label("app.kubernetes.io/name", "metaflow-flow")
)
.spec(
WorkflowSpec()
Expand Down Expand Up @@ -747,10 +749,11 @@ def _compile_workflow_template(self):
# Set workflow metadata
.workflow_metadata(
Metadata()
.labels(self.shared_kubernetes_labels)
.label("app.kubernetes.io/name", "metaflow-run")
.label("app.kubernetes.io/part-of", "metaflow")
.annotations(
{
**annotations,
**self.shared_kubernetes_annotations,
**{"metaflow/run_id": "argo-{{workflow.name}}"},
}
Expand Down Expand Up @@ -789,6 +792,7 @@ def _compile_workflow_template(self):
.labels(self.shared_kubernetes_labels)
.label("app.kubernetes.io/name", "metaflow-task")
.annotations(self.shared_kubernetes_annotations)
.annotations(annotations)
)
# Set the entrypoint to flow name
.entrypoint(self.flow.name)
Expand Down Expand Up @@ -2814,6 +2818,7 @@ def _compile_sensor(self):
.labels(self.shared_kubernetes_labels)
.label("app.kubernetes.io/name", "metaflow-sensor")
.annotations(self.shared_kubernetes_annotations)
.annotations(trigger_annotations)
)
.spec(
SensorSpec().template(
Expand Down

0 comments on commit 6ecf354

Please sign in to comment.