Skip to content

Commit

Permalink
Deploy time triggers (#2133)
Browse files Browse the repository at this point in the history
* trigger_on_finish sorta works

* trigger deco works for event

* trigger events changes

* run black

* ok this one ran black

* black ran for real

* deleting some things i missed

* add format_deploytime_value() to both decos

* fixes error with types

* ran black

* fixes failing cases

* remove json.loads and add tuple to param type

* ran black

* function within parameter

* Delete local config file

* fixing borked cases

* refactor code

* shouldn't be changes to flowspec.py

* fixing bugs

* add deploy time trigger inits to argo parameter handling (#2146)

* run black

* undo modifications to to_pod

* reset util file

* pr comment

* remove print

---------

Co-authored-by: kayla seeley <[email protected]>
Co-authored-by: Sakari Ikonen <[email protected]>
  • Loading branch information
3 people authored Nov 22, 2024
1 parent d8c7cc5 commit 50298d7
Show file tree
Hide file tree
Showing 3 changed files with 269 additions and 78 deletions.
10 changes: 8 additions & 2 deletions metaflow/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,22 +151,28 @@ def __call__(self, deploy_time=False):
return self._check_type(val, deploy_time)

def _check_type(self, val, deploy_time):

# it is easy to introduce a deploy-time function that accidentally
# returns a value whose type is not compatible with what is defined
# in Parameter. Let's catch those mistakes early here, instead of
# showing a cryptic stack trace later.

# note: this doesn't work with long in Python2 or types defined as
# click types, e.g. click.INT
TYPES = {bool: "bool", int: "int", float: "float", list: "list"}
TYPES = {bool: "bool", int: "int", float: "float", list: "list", dict: "dict"}

msg = (
"The value returned by the deploy-time function for "
"the parameter *%s* field *%s* has a wrong type. "
% (self.parameter_name, self.field)
)

if self.parameter_type in TYPES:
if isinstance(self.parameter_type, list):
if not any(isinstance(val, x) for x in self.parameter_type):
msg += "Expected one of the following %s." % TYPES[self.parameter_type]
raise ParameterFieldTypeMismatch(msg)
return str(val) if self.return_str else val
elif self.parameter_type in TYPES:
if type(val) != self.parameter_type:
msg += "Expected a %s." % TYPES[self.parameter_type]
raise ParameterFieldTypeMismatch(msg)
Expand Down
12 changes: 8 additions & 4 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,9 @@ def _process_triggers(self):
params = set(
[param.name.lower() for var, param in self.flow._get_parameters()]
)
for event in self.flow._flow_decorators.get("trigger")[0].triggers:
trigger_deco = self.flow._flow_decorators.get("trigger")[0]
trigger_deco.format_deploytime_value()
for event in trigger_deco.triggers:
parameters = {}
# TODO: Add a check to guard against names starting with numerals(?)
if not re.match(r"^[A-Za-z0-9_.-]+$", event["name"]):
Expand Down Expand Up @@ -562,9 +564,11 @@ def _process_triggers(self):

# @trigger_on_finish decorator
if self.flow._flow_decorators.get("trigger_on_finish"):
for event in self.flow._flow_decorators.get("trigger_on_finish")[
0
].triggers:
trigger_on_finish_deco = self.flow._flow_decorators.get(
"trigger_on_finish"
)[0]
trigger_on_finish_deco.format_deploytime_value()
for event in trigger_on_finish_deco.triggers:
# Actual filters are deduced here since we don't have access to
# the current object in the @trigger_on_finish decorator.
triggers.append(
Expand Down
Loading

0 comments on commit 50298d7

Please sign in to comment.