From 5a9426ca9a48dd1406b13e46049e3863ad9e89d9 Mon Sep 17 00:00:00 2001 From: kayla seeley Date: Sun, 3 Nov 2024 02:51:15 +0000 Subject: [PATCH 01/25] trigger_on_finish sorta works --- metaflow/parameters.py | 2 +- metaflow/plugins/events_decorator.py | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/metaflow/parameters.py b/metaflow/parameters.py index fe0dabbda3f..a9b8c1cadd3 100644 --- a/metaflow/parameters.py +++ b/metaflow/parameters.py @@ -158,7 +158,7 @@ def _check_type(self, val, deploy_time): # note: this doesn't work with long in Python2 or types defined as # click types, e.g. click.INT - TYPES = {bool: "bool", int: "int", float: "float", list: "list"} + TYPES = {bool: "bool", int: "int", float: "float", list: "list", dict: "dict"} msg = ( "The value returned by the deploy-time function for " diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index baa6320b0ba..2b1cafd5123 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -4,6 +4,7 @@ from metaflow.decorators import FlowDecorator from metaflow.exception import MetaflowException from metaflow.util import is_stringish +from metaflow.parameters import DeployTimeField, deploy_time_eval # TODO: Support dynamic parameter mapping through a context object that exposes # flow name and user name similar to parameter context @@ -312,6 +313,17 @@ def flow_init( "The *project_branch* attribute of the *flow* is not a string" ) self.triggers.append(result) + #DEPLOY TIME + #CAN RETURN STR OR DICT SO IDK WHAT TO DO AAAAA + #for now we pretend it's a string + # or maybe low key who cares, we can figure out in maestro.py wait no there's smth in deploytimefield + #lets see if NONE works + elif callable(self.attributes["flow"]) and not isinstance(self.attributes["flow"], DeployTimeField): + aaa = DeployTimeField("fq_name", None, None, self.attributes["flow"], False) + self.triggers.append( + aaa + ) + else: raise MetaflowException( "Incorrect type for *flow* attribute in *@trigger_on_finish* " @@ -383,6 +395,8 @@ def flow_init( # Make triggers @project aware for trigger in self.triggers: + if isinstance(trigger, DeployTimeField): + continue if trigger["fq_name"].count(".") == 0: # fully qualified name is just the flow name trigger["flow"] = trigger["fq_name"] From 427b554724d7a8244b9295e9d4ffd3e1d09d2575 Mon Sep 17 00:00:00 2001 From: kayla seeley Date: Mon, 4 Nov 2024 07:07:34 +0000 Subject: [PATCH 02/25] trigger deco works for event --- metaflow/plugins/events_decorator.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index 2b1cafd5123..87ffe0a19bb 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -115,6 +115,11 @@ def flow_init( ) self.attributes["event"]["parameters"] = new_param_value self.triggers.append(self.attributes["event"]) + elif callable(self.attributes["event"]) and not isinstance(self.attributes["event"], DeployTimeField): + trig = DeployTimeField("fq_name", None, None, self.attributes["event"], False) + self.triggers.append( + trig + ) else: raise MetaflowException( "Incorrect format for *event* attribute in *@trigger* decorator. " @@ -157,6 +162,7 @@ def flow_init( ) event["parameters"] = new_param_value self.triggers.append(event) + else: raise MetaflowException( "One or more events in *events* attribute in *@trigger* " @@ -166,7 +172,15 @@ def flow_init( "'beta'}}, {'name': 'bar', 'parameters': " "{'gamma': 'kappa'}}])" ) + + elif callable(self.attributes["events"]) and not isinstance(self.attributes["events"], DeployTimeField): + trig = DeployTimeField("events", list, None, self.attributes["events"], False) + #TODO idk if this is right + self.triggers.append( + trig + ) else: + print(self.attributes["events"]) raise MetaflowException( "Incorrect format for *events* attribute in *@trigger* decorator. " "Supported format is list - \n" @@ -179,7 +193,7 @@ def flow_init( raise MetaflowException("No event(s) specified in *@trigger* decorator.") # same event shouldn't occur more than once - names = [x["name"] for x in self.triggers] + names = [x["name"] for x in self.triggers if not isinstance(x, DeployTimeField)] if len(names) != len(set(names)): raise MetaflowException( "Duplicate event names defined in *@trigger* decorator." @@ -319,9 +333,9 @@ def flow_init( # or maybe low key who cares, we can figure out in maestro.py wait no there's smth in deploytimefield #lets see if NONE works elif callable(self.attributes["flow"]) and not isinstance(self.attributes["flow"], DeployTimeField): - aaa = DeployTimeField("fq_name", None, None, self.attributes["flow"], False) + trig = DeployTimeField("fq_name", None, None, self.attributes["flow"], False) self.triggers.append( - aaa + trig ) else: @@ -381,6 +395,11 @@ def flow_init( "Supported type is string or Dict[str, str]- \n" "@trigger_on_finish(flows=['FooFlow', 'BarFlow']" ) + elif callable(self.attributes["flows"]) and not isinstance(self.attributes["flows"], DeployTimeField): + trig = DeployTimeField("flows", list, None, self.attributes["flows"], False) + self.triggers.append( + trig + ) else: raise MetaflowException( "Incorrect type for *flows* attribute in *@trigger_on_finish* " From d77578b2ed5c1ac381b24ca279688c9ea0fa57c2 Mon Sep 17 00:00:00 2001 From: kayla seeley Date: Tue, 5 Nov 2024 00:10:57 +0000 Subject: [PATCH 03/25] trigger events changes --- metaflow/plugins/events_decorator.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index 87ffe0a19bb..0b45c9edde7 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -99,6 +99,9 @@ def flow_init( "The *event* attribute for *@trigger* is missing the " "*name* key." ) + elif callable(self.attributes["event"]["name"]) and not isinstance(self.attributes["event"]["name"], DeployTimeField): + new_name = DeployTimeField("event_name", None, None, self.attributes["event"]["name"], False) + self.attributes["event"]["name"] = new_name param_value = self.attributes["event"].get("parameters", {}) if isinstance(param_value, (list, tuple)): new_param_value = {} @@ -114,6 +117,10 @@ def flow_init( "of size 2" % self.attributes["event"]["name"] ) self.attributes["event"]["parameters"] = new_param_value + # self.triggers.append(self.attributes["event"]) + elif callable(param_value) and not isinstance(param_value, DeployTimeField): + new_param_value = DeployTimeField("param", None, None, param_value, False) + self.attributes["event"]["parameters"] = new_param_value self.triggers.append(self.attributes["event"]) elif callable(self.attributes["event"]) and not isinstance(self.attributes["event"], DeployTimeField): trig = DeployTimeField("fq_name", None, None, self.attributes["event"], False) @@ -143,6 +150,9 @@ def flow_init( "One or more events in *events* attribute for " "*@trigger* are missing the *name* key." ) + elif callable(event["name"]) and not isinstance(event["name"], DeployTimeField): + new_name = DeployTimeField("event_name", None, None, event["name"], False) + event["name"] = new_name param_value = event.get("parameters", {}) if isinstance(param_value, (list, tuple)): new_param_value = {} @@ -159,10 +169,17 @@ def flow_init( "The *parameters* attribute for event '%s' is " "invalid. It should be a list/tuple of strings " "and lists/tuples of size 2" % event["name"] - ) + ) + event["parameters"] = new_param_value + elif callable(param_value) and not isinstance(param_value, DeployTimeField): + new_param_value = DeployTimeField("param", None, None, param_value, False) event["parameters"] = new_param_value self.triggers.append(event) - + elif callable(event) and not isinstance(event, DeployTimeField): + trig = DeployTimeField("fq_name", None, None, self.attributes["event"], False) + self.triggers.append( + trig + ) else: raise MetaflowException( "One or more events in *events* attribute in *@trigger* " @@ -175,12 +192,10 @@ def flow_init( elif callable(self.attributes["events"]) and not isinstance(self.attributes["events"], DeployTimeField): trig = DeployTimeField("events", list, None, self.attributes["events"], False) - #TODO idk if this is right self.triggers.append( trig ) else: - print(self.attributes["events"]) raise MetaflowException( "Incorrect format for *events* attribute in *@trigger* decorator. " "Supported format is list - \n" @@ -327,11 +342,6 @@ def flow_init( "The *project_branch* attribute of the *flow* is not a string" ) self.triggers.append(result) - #DEPLOY TIME - #CAN RETURN STR OR DICT SO IDK WHAT TO DO AAAAA - #for now we pretend it's a string - # or maybe low key who cares, we can figure out in maestro.py wait no there's smth in deploytimefield - #lets see if NONE works elif callable(self.attributes["flow"]) and not isinstance(self.attributes["flow"], DeployTimeField): trig = DeployTimeField("fq_name", None, None, self.attributes["flow"], False) self.triggers.append( From b0b5388e783b738b6403955caf560ca02193e081 Mon Sep 17 00:00:00 2001 From: kayla seeley Date: Tue, 5 Nov 2024 07:08:03 +0000 Subject: [PATCH 04/25] run black --- metaflow/plugins/events_decorator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index 0b45c9edde7..091045cdb0f 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -347,7 +347,6 @@ def flow_init( self.triggers.append( trig ) - else: raise MetaflowException( "Incorrect type for *flow* attribute in *@trigger_on_finish* " From 1bd97e804040da7bb14d8de9491a774bc9a18d7c Mon Sep 17 00:00:00 2001 From: kayla seeley Date: Tue, 5 Nov 2024 07:10:00 +0000 Subject: [PATCH 05/25] ok this one ran black --- metaflow/plugins/events_decorator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index 091045cdb0f..d8230abf960 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -453,7 +453,6 @@ def flow_init( if options["trigger"]: from metaflow import Run from metaflow.events import Trigger - run_objs = [] for run_pathspec in options["trigger"]: if len(run_pathspec.split("/")) != 2: From 6d0a9124cf427c60d2ab67ff0e77c56c864737d2 Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Tue, 5 Nov 2024 08:02:53 +0000 Subject: [PATCH 06/25] black ran for real --- metaflow/plugins/events_decorator.py | 91 ++++++++++++++++++---------- 1 file changed, 60 insertions(+), 31 deletions(-) diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index d8230abf960..28b49ad8565 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -99,8 +99,16 @@ def flow_init( "The *event* attribute for *@trigger* is missing the " "*name* key." ) - elif callable(self.attributes["event"]["name"]) and not isinstance(self.attributes["event"]["name"], DeployTimeField): - new_name = DeployTimeField("event_name", None, None, self.attributes["event"]["name"], False) + elif callable(self.attributes["event"]["name"]) and not isinstance( + self.attributes["event"]["name"], DeployTimeField + ): + new_name = DeployTimeField( + "event_name", + None, + None, + self.attributes["event"]["name"], + False, + ) self.attributes["event"]["name"] = new_name param_value = self.attributes["event"].get("parameters", {}) if isinstance(param_value, (list, tuple)): @@ -118,15 +126,21 @@ def flow_init( ) self.attributes["event"]["parameters"] = new_param_value # self.triggers.append(self.attributes["event"]) - elif callable(param_value) and not isinstance(param_value, DeployTimeField): - new_param_value = DeployTimeField("param", None, None, param_value, False) + elif callable(param_value) and not isinstance( + param_value, DeployTimeField + ): + new_param_value = DeployTimeField( + "param", None, None, param_value, False + ) self.attributes["event"]["parameters"] = new_param_value self.triggers.append(self.attributes["event"]) - elif callable(self.attributes["event"]) and not isinstance(self.attributes["event"], DeployTimeField): - trig = DeployTimeField("fq_name", None, None, self.attributes["event"], False) - self.triggers.append( - trig + elif callable(self.attributes["event"]) and not isinstance( + self.attributes["event"], DeployTimeField + ): + trig = DeployTimeField( + "fq_name", None, None, self.attributes["event"], False ) + self.triggers.append(trig) else: raise MetaflowException( "Incorrect format for *event* attribute in *@trigger* decorator. " @@ -150,8 +164,12 @@ def flow_init( "One or more events in *events* attribute for " "*@trigger* are missing the *name* key." ) - elif callable(event["name"]) and not isinstance(event["name"], DeployTimeField): - new_name = DeployTimeField("event_name", None, None, event["name"], False) + elif callable(event["name"]) and not isinstance( + event["name"], DeployTimeField + ): + new_name = DeployTimeField( + "event_name", None, None, event["name"], False + ) event["name"] = new_name param_value = event.get("parameters", {}) if isinstance(param_value, (list, tuple)): @@ -169,17 +187,21 @@ def flow_init( "The *parameters* attribute for event '%s' is " "invalid. It should be a list/tuple of strings " "and lists/tuples of size 2" % event["name"] - ) + ) event["parameters"] = new_param_value - elif callable(param_value) and not isinstance(param_value, DeployTimeField): - new_param_value = DeployTimeField("param", None, None, param_value, False) + elif callable(param_value) and not isinstance( + param_value, DeployTimeField + ): + new_param_value = DeployTimeField( + "param", None, None, param_value, False + ) event["parameters"] = new_param_value self.triggers.append(event) elif callable(event) and not isinstance(event, DeployTimeField): - trig = DeployTimeField("fq_name", None, None, self.attributes["event"], False) - self.triggers.append( - trig - ) + trig = DeployTimeField( + "fq_name", None, None, self.attributes["event"], False + ) + self.triggers.append(trig) else: raise MetaflowException( "One or more events in *events* attribute in *@trigger* " @@ -189,12 +211,14 @@ def flow_init( "'beta'}}, {'name': 'bar', 'parameters': " "{'gamma': 'kappa'}}])" ) - - elif callable(self.attributes["events"]) and not isinstance(self.attributes["events"], DeployTimeField): - trig = DeployTimeField("events", list, None, self.attributes["events"], False) - self.triggers.append( - trig - ) + + elif callable(self.attributes["events"]) and not isinstance( + self.attributes["events"], DeployTimeField + ): + trig = DeployTimeField( + "events", list, None, self.attributes["events"], False + ) + self.triggers.append(trig) else: raise MetaflowException( "Incorrect format for *events* attribute in *@trigger* decorator. " @@ -342,11 +366,13 @@ def flow_init( "The *project_branch* attribute of the *flow* is not a string" ) self.triggers.append(result) - elif callable(self.attributes["flow"]) and not isinstance(self.attributes["flow"], DeployTimeField): - trig = DeployTimeField("fq_name", None, None, self.attributes["flow"], False) - self.triggers.append( - trig + elif callable(self.attributes["flow"]) and not isinstance( + self.attributes["flow"], DeployTimeField + ): + trig = DeployTimeField( + "fq_name", None, None, self.attributes["flow"], False ) + self.triggers.append(trig) else: raise MetaflowException( "Incorrect type for *flow* attribute in *@trigger_on_finish* " @@ -404,11 +430,13 @@ def flow_init( "Supported type is string or Dict[str, str]- \n" "@trigger_on_finish(flows=['FooFlow', 'BarFlow']" ) - elif callable(self.attributes["flows"]) and not isinstance(self.attributes["flows"], DeployTimeField): - trig = DeployTimeField("flows", list, None, self.attributes["flows"], False) - self.triggers.append( - trig + elif callable(self.attributes["flows"]) and not isinstance( + self.attributes["flows"], DeployTimeField + ): + trig = DeployTimeField( + "flows", list, None, self.attributes["flows"], False ) + self.triggers.append(trig) else: raise MetaflowException( "Incorrect type for *flows* attribute in *@trigger_on_finish* " @@ -453,6 +481,7 @@ def flow_init( if options["trigger"]: from metaflow import Run from metaflow.events import Trigger + run_objs = [] for run_pathspec in options["trigger"]: if len(run_pathspec.split("/")) != 2: From 1940626f69c21093173d573ba020f9a08ff3da45 Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Tue, 5 Nov 2024 20:13:08 +0000 Subject: [PATCH 07/25] deleting some things i missed --- metaflow/plugins/events_decorator.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index 28b49ad8565..a32542006e7 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -4,7 +4,7 @@ from metaflow.decorators import FlowDecorator from metaflow.exception import MetaflowException from metaflow.util import is_stringish -from metaflow.parameters import DeployTimeField, deploy_time_eval +from metaflow.parameters import DeployTimeField # TODO: Support dynamic parameter mapping through a context object that exposes # flow name and user name similar to parameter context @@ -125,7 +125,6 @@ def flow_init( "of size 2" % self.attributes["event"]["name"] ) self.attributes["event"]["parameters"] = new_param_value - # self.triggers.append(self.attributes["event"]) elif callable(param_value) and not isinstance( param_value, DeployTimeField ): @@ -138,7 +137,7 @@ def flow_init( self.attributes["event"], DeployTimeField ): trig = DeployTimeField( - "fq_name", None, None, self.attributes["event"], False + "event", None, None, self.attributes["event"], False ) self.triggers.append(trig) else: @@ -199,7 +198,7 @@ def flow_init( self.triggers.append(event) elif callable(event) and not isinstance(event, DeployTimeField): trig = DeployTimeField( - "fq_name", None, None, self.attributes["event"], False + "event", None, None, self.attributes["event"], False ) self.triggers.append(trig) else: @@ -211,7 +210,6 @@ def flow_init( "'beta'}}, {'name': 'bar', 'parameters': " "{'gamma': 'kappa'}}])" ) - elif callable(self.attributes["events"]) and not isinstance( self.attributes["events"], DeployTimeField ): From 3f7f6409581ad4d243b863311d9d7a97fd46a7ff Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Wed, 6 Nov 2024 23:47:34 +0000 Subject: [PATCH 08/25] add format_deploytime_value() to both decos --- metaflow/plugins/events_decorator.py | 111 ++++++++++++++++++++++++++- 1 file changed, 110 insertions(+), 1 deletion(-) diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index a32542006e7..8ad347bf720 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -1,10 +1,11 @@ import re +import json from metaflow import current from metaflow.decorators import FlowDecorator from metaflow.exception import MetaflowException from metaflow.util import is_stringish -from metaflow.parameters import DeployTimeField +from metaflow.parameters import DeployTimeField, deploy_time_eval # TODO: Support dynamic parameter mapping through a context object that exposes # flow name and user name similar to parameter context @@ -240,6 +241,61 @@ def flow_init( # TODO: Handle scenario for local testing using --trigger. + def format_deploytime_value(self): + for trigger in self.triggers: + # Case were trigger is a function that returns a list of events + # Need to do this bc we need to iterate over list later + if isinstance(trigger, DeployTimeField): + if isinstance(deploy_time_eval(trigger), list): + deploy_val = deploy_time_eval(trigger) + self.triggers.remove(trigger) + self.triggers.extend(deploy_val) + else: + break + for trigger in self.triggers: + old_trigger = trigger + # Entire event is a function (returns either string or dict) + if isinstance(trigger, DeployTimeField): + trigger = deploy_time_eval(trigger) + try: + trigger = json.loads(trigger) + except (TypeError, json.JSONDecodeError): + pass + # Case where just the name is a function (always a str) + if isinstance(trigger, str): + trigger = {"name": trigger} + trigger_params = trigger.get("parameters", {}) + # Case where param is a function (can return list or dict) + if isinstance(trigger_params, DeployTimeField): + trigger_params = deploy_time_eval(trigger_params) + try: + trigger_params = json.loads(trigger_params) + except (TypeError, json.JSONDecodeError): + pass + # If params is a list of strings, convert to dict with same key and value + if isinstance(trigger_params, (list, tuple)): + new_trigger_params = {} + for mapping in trigger_params: + if is_stringish(mapping): + new_trigger_params[mapping] = mapping + elif isinstance(mapping, (list, tuple)) and len(mapping) == 2: + new_trigger_params[mapping[0]] = mapping[1] + else: + raise MetaflowException( + "The *parameters* attribute for event '%s' is invalid. " + "It should be a list/tuple of strings and lists/tuples " + "of size 2" % self.attributes["event"]["name"] + ) + trigger_params = new_trigger_params + + trigger_name = trigger.get("name") + # Case where just the name is a function (always a str) + if isinstance(trigger_name, DeployTimeField): + trigger_name = deploy_time_eval(trigger_name) + trigger["name"] = trigger_name + # Replace old trigger with new trigger + self.triggers[self.triggers.index(old_trigger)] = trigger + class TriggerOnFinishDecorator(FlowDecorator): """ @@ -495,5 +551,58 @@ def flow_init( run_objs.append(run_obj) current._update_env({"trigger": Trigger.from_runs(run_objs)}) + def _parse_fq_name(self, trigger): + if isinstance(trigger, DeployTimeField): + trigger["fq_name"] = deploy_time_eval(trigger["fq_name"]) + if trigger["fq_name"].count(".") == 0: + # fully qualified name is just the flow name + trigger["flow"] = trigger["fq_name"] + elif trigger["fq_name"].count(".") >= 2: + # fully qualified name is of the format - project.branch.flow_name + trigger["project"], tail = trigger["fq_name"].split(".", maxsplit=1) + trigger["branch"], trigger["flow"] = tail.rsplit(".", maxsplit=1) + else: + raise MetaflowException( + "Incorrect format for *flow* in *@trigger_on_finish* " + "decorator. Specify either just the *flow_name* or a fully " + "qualified name like *project_name.branch_name.flow_name*." + ) + if not re.match(r"^[A-Za-z0-9_]+$", trigger["flow"]): + raise MetaflowException( + "Invalid flow name *%s* in *@trigger_on_finish* " + "decorator. Only alphanumeric characters and " + "underscores(_) are allowed." % trigger["flow"] + ) + return trigger + + def format_deploytime_value(self): + for trigger in self.triggers: + # Case were trigger is a function that returns a list + # Need to do this bc we need to iterate over list and process + if isinstance(trigger, DeployTimeField): + deploy_value = deploy_time_eval(trigger) + if isinstance(deploy_value, list): + self.triggers = deploy_value + else: + break + for trigger in self.triggers: + # Entire trigger is a function (returns either string or dict) + old_trig = trigger + if isinstance(trigger, DeployTimeField): + trigger = deploy_time_eval(trigger) + try: + trigger = json.loads(trigger) + except (TypeError, json.JSONDecodeError): + pass + if isinstance(trigger, dict): + trigger["fq_name"] = trigger.get("name") + trigger["project"] = trigger.get("project") + trigger["branch"] = trigger.get("project_branch") + # We also added this bc it won't be formatted yet + if isinstance(trigger, str): + trigger = {"fq_name": trigger} + trigger = self._parse_fq_name(trigger) + self.triggers[self.triggers.index(old_trig)] = trigger + def get_top_level_options(self): return list(self._option_values.items()) From e4e597efa2e6e180810ed42a7bf117ce1ca2179c Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Sun, 10 Nov 2024 05:56:13 +0000 Subject: [PATCH 09/25] fixes error with types --- metaflow/parameters.py | 8 +++++++- metaflow/util.py | 4 ++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/metaflow/parameters.py b/metaflow/parameters.py index a9b8c1cadd3..c65e2968864 100644 --- a/metaflow/parameters.py +++ b/metaflow/parameters.py @@ -151,6 +151,7 @@ def __call__(self, deploy_time=False): return self._check_type(val, deploy_time) def _check_type(self, val, deploy_time): + # it is easy to introduce a deploy-time function that accidentally # returns a value whose type is not compatible with what is defined # in Parameter. Let's catch those mistakes early here, instead of @@ -166,7 +167,12 @@ def _check_type(self, val, deploy_time): % (self.parameter_name, self.field) ) - if self.parameter_type in TYPES: + if isinstance(self.parameter_type, list): + if not any(isinstance(val, x) for x in self.parameter_type): + msg += "Expected a %s." % TYPES[self.parameter_type] + raise ParameterFieldTypeMismatch(msg) + return str(val) if self.return_str else val + elif self.parameter_type in TYPES: if type(val) != self.parameter_type: msg += "Expected a %s." % TYPES[self.parameter_type] raise ParameterFieldTypeMismatch(msg) diff --git a/metaflow/util.py b/metaflow/util.py index e95ab9f0f87..4535c98d5a0 100644 --- a/metaflow/util.py +++ b/metaflow/util.py @@ -436,12 +436,16 @@ def to_pod(value): Value to convert to POD format. The value can be a string, number, list, dictionary, or a nested structure of these types. """ + from metaflow.parameters import DeployTimeField, deploy_time_eval + if isinstance(value, (str, int, float)): return value if isinstance(value, dict): return {to_pod(k): to_pod(v) for k, v in value.items()} if isinstance(value, (list, set, tuple)): return [to_pod(v) for v in value] + if isinstance(value, DeployTimeField): + return to_pod(deploy_time_eval(value)) return str(value) From 9beb99d7bfdb79727edb94686c8ea6889abdc721 Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Sun, 10 Nov 2024 21:55:35 +0000 Subject: [PATCH 10/25] ran black --- metaflow/parameters.py | 2 +- metaflow/plugins/events_decorator.py | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/metaflow/parameters.py b/metaflow/parameters.py index c65e2968864..c1383bf224a 100644 --- a/metaflow/parameters.py +++ b/metaflow/parameters.py @@ -167,7 +167,7 @@ def _check_type(self, val, deploy_time): % (self.parameter_name, self.field) ) - if isinstance(self.parameter_type, list): + if isinstance(self.parameter_type, list): if not any(isinstance(val, x) for x in self.parameter_type): msg += "Expected a %s." % TYPES[self.parameter_type] raise ParameterFieldTypeMismatch(msg) diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index 8ad347bf720..a94cd60788d 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -105,7 +105,7 @@ def flow_init( ): new_name = DeployTimeField( "event_name", - None, + str, None, self.attributes["event"]["name"], False, @@ -126,11 +126,11 @@ def flow_init( "of size 2" % self.attributes["event"]["name"] ) self.attributes["event"]["parameters"] = new_param_value - elif callable(param_value) and not isinstance( + elif callable(param_value) and not isinstance( # can be list or dict param_value, DeployTimeField ): new_param_value = DeployTimeField( - "param", None, None, param_value, False + "param", [list, dict], None, param_value, False ) self.attributes["event"]["parameters"] = new_param_value self.triggers.append(self.attributes["event"]) @@ -138,7 +138,7 @@ def flow_init( self.attributes["event"], DeployTimeField ): trig = DeployTimeField( - "event", None, None, self.attributes["event"], False + "event", dict, None, self.attributes["event"], False ) self.triggers.append(trig) else: @@ -168,7 +168,7 @@ def flow_init( event["name"], DeployTimeField ): new_name = DeployTimeField( - "event_name", None, None, event["name"], False + "event_name", str, None, event["name"], False ) event["name"] = new_name param_value = event.get("parameters", {}) @@ -193,13 +193,13 @@ def flow_init( param_value, DeployTimeField ): new_param_value = DeployTimeField( - "param", None, None, param_value, False + "param", [list, dict], None, param_value, False ) event["parameters"] = new_param_value self.triggers.append(event) elif callable(event) and not isinstance(event, DeployTimeField): trig = DeployTimeField( - "event", None, None, self.attributes["event"], False + "event", dict, None, self.attributes["event"], False ) self.triggers.append(trig) else: @@ -286,7 +286,9 @@ def format_deploytime_value(self): "It should be a list/tuple of strings and lists/tuples " "of size 2" % self.attributes["event"]["name"] ) + trigger_params = new_trigger_params + trigger["parameters"] = trigger_params trigger_name = trigger.get("name") # Case where just the name is a function (always a str) @@ -424,7 +426,7 @@ def flow_init( self.attributes["flow"], DeployTimeField ): trig = DeployTimeField( - "fq_name", None, None, self.attributes["flow"], False + "fq_name", [str, dict], None, self.attributes["flow"], False ) self.triggers.append(trig) else: From 0f1effabdf5e2d6707d7c2f7428a9d0377288204 Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Thu, 14 Nov 2024 19:25:23 +0000 Subject: [PATCH 11/25] fixes failing cases --- ' | 16 +++++ .../v3_5/importlib_metadata/_compat.py | 1 - metaflow/flowspec.py | 5 +- metaflow/parameters.py | 2 +- metaflow/plugins/events_decorator.py | 69 ++++++++++--------- metaflow/util.py | 5 +- 6 files changed, 62 insertions(+), 36 deletions(-) create mode 100644 ' diff --git a/' b/' new file mode 100644 index 00000000000..75581630898 --- /dev/null +++ b/' @@ -0,0 +1,16 @@ +[core] + repositoryformatversion = 0 + filemode = true + bare = false + logallrefupdates = true +[remote "origin"] + url = git@github.com:Netflix/metaflow.git + fetch = +refs/heads/*:refs/remotes/origin/* +[branch "master"] + remote = origin + merge = refs/heads/master + vscode-merge-base = origin/master +[branch "deploy_time_triggers"] + vscode-merge-base = origin/master +[branch "temp"] + vscode-merge-base = origin/master diff --git a/metaflow/_vendor/v3_5/importlib_metadata/_compat.py b/metaflow/_vendor/v3_5/importlib_metadata/_compat.py index 303d4a22e85..dda9c1e4e12 100644 --- a/metaflow/_vendor/v3_5/importlib_metadata/_compat.py +++ b/metaflow/_vendor/v3_5/importlib_metadata/_compat.py @@ -117,7 +117,6 @@ def py2_message_from_string(text): # nocoverpy3 email.message_from_string ) - class PyPy_repr: """ Override repr for EntryPoint objects on PyPy to avoid __iter__ access. diff --git a/metaflow/flowspec.py b/metaflow/flowspec.py index 0c7ffd1f128..ec072f1f673 100644 --- a/metaflow/flowspec.py +++ b/metaflow/flowspec.py @@ -193,8 +193,11 @@ def _set_constants(self, graph, kwargs): setattr(self, var, val) # We store the DAG information as an artifact called _graph_info + for deco in flow_decorators(self): + if deco.name == "trigger": + for trig in deco.triggers: + print(trig) steps_info, graph_structure = graph.output_steps() - graph_info = { "file": os.path.basename(os.path.abspath(sys.argv[0])), "parameters": parameters_info, diff --git a/metaflow/parameters.py b/metaflow/parameters.py index c1383bf224a..e5778e6cd1e 100644 --- a/metaflow/parameters.py +++ b/metaflow/parameters.py @@ -169,7 +169,7 @@ def _check_type(self, val, deploy_time): if isinstance(self.parameter_type, list): if not any(isinstance(val, x) for x in self.parameter_type): - msg += "Expected a %s." % TYPES[self.parameter_type] + msg += "Expected one of the following %s." % TYPES[self.parameter_type] raise ParameterFieldTypeMismatch(msg) return str(val) if self.return_str else val elif self.parameter_type in TYPES: diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index a94cd60788d..df0a0ec512a 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -111,10 +111,10 @@ def flow_init( False, ) self.attributes["event"]["name"] = new_name - param_value = self.attributes["event"].get("parameters", {}) - if isinstance(param_value, (list, tuple)): + parameters = self.attributes["event"].get("parameters", {}) + if isinstance(parameters, (list, tuple)): new_param_value = {} - for mapping in param_value: + for mapping in parameters: if is_stringish(mapping): new_param_value[mapping] = mapping elif isinstance(mapping, (list, tuple)) and len(mapping) == 2: @@ -126,11 +126,11 @@ def flow_init( "of size 2" % self.attributes["event"]["name"] ) self.attributes["event"]["parameters"] = new_param_value - elif callable(param_value) and not isinstance( # can be list or dict - param_value, DeployTimeField + elif callable(parameters) and not isinstance( # can be list or dict + parameters, DeployTimeField ): new_param_value = DeployTimeField( - "param", [list, dict], None, param_value, False + "parameters", [list, dict], None, parameters, False ) self.attributes["event"]["parameters"] = new_param_value self.triggers.append(self.attributes["event"]) @@ -138,7 +138,7 @@ def flow_init( self.attributes["event"], DeployTimeField ): trig = DeployTimeField( - "event", dict, None, self.attributes["event"], False + "event", [str, dict], None, self.attributes["event"], False ) self.triggers.append(trig) else: @@ -171,10 +171,10 @@ def flow_init( "event_name", str, None, event["name"], False ) event["name"] = new_name - param_value = event.get("parameters", {}) - if isinstance(param_value, (list, tuple)): + parameters = event.get("parameters", {}) + if isinstance(parameters, (list, tuple)): new_param_value = {} - for mapping in param_value: + for mapping in parameters: if is_stringish(mapping): new_param_value[mapping] = mapping elif ( @@ -189,17 +189,17 @@ def flow_init( "and lists/tuples of size 2" % event["name"] ) event["parameters"] = new_param_value - elif callable(param_value) and not isinstance( - param_value, DeployTimeField + elif callable(parameters) and not isinstance( + parameters, DeployTimeField ): new_param_value = DeployTimeField( - "param", [list, dict], None, param_value, False + "parameters", [list, dict], None, parameters, False ) event["parameters"] = new_param_value self.triggers.append(event) elif callable(event) and not isinstance(event, DeployTimeField): trig = DeployTimeField( - "event", dict, None, self.attributes["event"], False + "event", [str, dict], None, self.attributes["event"], False ) self.triggers.append(trig) else: @@ -231,7 +231,11 @@ def flow_init( raise MetaflowException("No event(s) specified in *@trigger* decorator.") # same event shouldn't occur more than once - names = [x["name"] for x in self.triggers if not isinstance(x, DeployTimeField)] + names = [ + x["name"] + for x in self.triggers + if not isinstance(x, DeployTimeField) and not isinstance(x["name"], DeployTimeField) + ] if len(names) != len(set(names)): raise MetaflowException( "Duplicate event names defined in *@trigger* decorator." @@ -243,27 +247,27 @@ def flow_init( def format_deploytime_value(self): for trigger in self.triggers: - # Case were trigger is a function that returns a list of events + new_triggers = self.triggers + # Case where trigger is a function that returns a list of events # Need to do this bc we need to iterate over list later if isinstance(trigger, DeployTimeField): - if isinstance(deploy_time_eval(trigger), list): - deploy_val = deploy_time_eval(trigger) - self.triggers.remove(trigger) - self.triggers.extend(deploy_val) - else: - break + evaluated_trigger = deploy_time_eval(trigger) + old_trigger = trigger + if isinstance(evaluated_trigger, dict): + trigger = evaluated_trigger + elif isinstance(evaluated_trigger, str): + trigger = {"name": evaluated_trigger} + new_triggers.remove(old_trigger) + if isinstance(evaluated_trigger, list): + if all(is_stringish(event) for event in evaluated_trigger): + new_triggers.extend({"name": event_name} for event_name in evaluated_trigger) + else: + new_triggers.append(trigger) + + self.triggers = new_triggers for trigger in self.triggers: + print(trigger) old_trigger = trigger - # Entire event is a function (returns either string or dict) - if isinstance(trigger, DeployTimeField): - trigger = deploy_time_eval(trigger) - try: - trigger = json.loads(trigger) - except (TypeError, json.JSONDecodeError): - pass - # Case where just the name is a function (always a str) - if isinstance(trigger, str): - trigger = {"name": trigger} trigger_params = trigger.get("parameters", {}) # Case where param is a function (can return list or dict) if isinstance(trigger_params, DeployTimeField): @@ -296,6 +300,7 @@ def format_deploytime_value(self): trigger_name = deploy_time_eval(trigger_name) trigger["name"] = trigger_name # Replace old trigger with new trigger + #TODO might need third layer self.triggers[self.triggers.index(old_trigger)] = trigger diff --git a/metaflow/util.py b/metaflow/util.py index 4535c98d5a0..7e9d2422cf0 100644 --- a/metaflow/util.py +++ b/metaflow/util.py @@ -445,7 +445,10 @@ def to_pod(value): if isinstance(value, (list, set, tuple)): return [to_pod(v) for v in value] if isinstance(value, DeployTimeField): - return to_pod(deploy_time_eval(value)) + + return None + # return to_pod(deploy_time_eval(value)) + return str(value) From d7211eff3e696fe1e8528dc52856c099778a532a Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Thu, 14 Nov 2024 19:44:04 +0000 Subject: [PATCH 12/25] remove json.loads and add tuple to param type --- metaflow/plugins/events_decorator.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index df0a0ec512a..b9708ec590c 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -130,7 +130,7 @@ def flow_init( parameters, DeployTimeField ): new_param_value = DeployTimeField( - "parameters", [list, dict], None, parameters, False + "parameters", [list, dict, tuple], None, parameters, False ) self.attributes["event"]["parameters"] = new_param_value self.triggers.append(self.attributes["event"]) @@ -193,7 +193,7 @@ def flow_init( parameters, DeployTimeField ): new_param_value = DeployTimeField( - "parameters", [list, dict], None, parameters, False + "parameters", [list, dict, tuple], None, parameters, False ) event["parameters"] = new_param_value self.triggers.append(event) @@ -266,16 +266,11 @@ def format_deploytime_value(self): self.triggers = new_triggers for trigger in self.triggers: - print(trigger) old_trigger = trigger trigger_params = trigger.get("parameters", {}) # Case where param is a function (can return list or dict) if isinstance(trigger_params, DeployTimeField): trigger_params = deploy_time_eval(trigger_params) - try: - trigger_params = json.loads(trigger_params) - except (TypeError, json.JSONDecodeError): - pass # If params is a list of strings, convert to dict with same key and value if isinstance(trigger_params, (list, tuple)): new_trigger_params = {} @@ -290,7 +285,6 @@ def format_deploytime_value(self): "It should be a list/tuple of strings and lists/tuples " "of size 2" % self.attributes["event"]["name"] ) - trigger_params = new_trigger_params trigger["parameters"] = trigger_params @@ -597,10 +591,6 @@ def format_deploytime_value(self): old_trig = trigger if isinstance(trigger, DeployTimeField): trigger = deploy_time_eval(trigger) - try: - trigger = json.loads(trigger) - except (TypeError, json.JSONDecodeError): - pass if isinstance(trigger, dict): trigger["fq_name"] = trigger.get("name") trigger["project"] = trigger.get("project") From b7ee41dddaac69e9dacbfbf079819399691d5509 Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Thu, 14 Nov 2024 20:51:04 +0000 Subject: [PATCH 13/25] ran black --- metaflow/plugins/events_decorator.py | 15 +++++++++++---- metaflow/util.py | 3 ++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index b9708ec590c..c36bf83c660 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -193,7 +193,11 @@ def flow_init( parameters, DeployTimeField ): new_param_value = DeployTimeField( - "parameters", [list, dict, tuple], None, parameters, False + "parameters", + [list, dict, tuple], + None, + parameters, + False, ) event["parameters"] = new_param_value self.triggers.append(event) @@ -234,7 +238,8 @@ def flow_init( names = [ x["name"] for x in self.triggers - if not isinstance(x, DeployTimeField) and not isinstance(x["name"], DeployTimeField) + if not isinstance(x, DeployTimeField) + and not isinstance(x["name"], DeployTimeField) ] if len(names) != len(set(names)): raise MetaflowException( @@ -260,7 +265,9 @@ def format_deploytime_value(self): new_triggers.remove(old_trigger) if isinstance(evaluated_trigger, list): if all(is_stringish(event) for event in evaluated_trigger): - new_triggers.extend({"name": event_name} for event_name in evaluated_trigger) + new_triggers.extend( + {"name": event_name} for event_name in evaluated_trigger + ) else: new_triggers.append(trigger) @@ -294,7 +301,7 @@ def format_deploytime_value(self): trigger_name = deploy_time_eval(trigger_name) trigger["name"] = trigger_name # Replace old trigger with new trigger - #TODO might need third layer + # TODO might need third layer self.triggers[self.triggers.index(old_trigger)] = trigger diff --git a/metaflow/util.py b/metaflow/util.py index 7e9d2422cf0..885f0e9846f 100644 --- a/metaflow/util.py +++ b/metaflow/util.py @@ -11,6 +11,7 @@ from metaflow.exception import MetaflowUnknownUser, MetaflowInternalError + try: # python2 unicode_type = unicode @@ -445,7 +446,7 @@ def to_pod(value): if isinstance(value, (list, set, tuple)): return [to_pod(v) for v in value] if isinstance(value, DeployTimeField): - + return None # return to_pod(deploy_time_eval(value)) From ec97b7b879f407fe76f1ef6e5c75b1c3b8c3aed6 Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Fri, 15 Nov 2024 23:50:27 +0000 Subject: [PATCH 14/25] function within parameter --- metaflow/plugins/events_decorator.py | 125 ++++++++++++++++++++++++--- 1 file changed, 112 insertions(+), 13 deletions(-) diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index c36bf83c660..65eea0eeabe 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -117,7 +117,23 @@ def flow_init( for mapping in parameters: if is_stringish(mapping): new_param_value[mapping] = mapping + elif callable(mapping): + mapping = DeployTimeField( + "parameter_val", str, None, mapping, False + ) + new_param_value[mapping] = mapping elif isinstance(mapping, (list, tuple)) and len(mapping) == 2: + if callable(mapping[1]) and not isinstance( + mapping[1], DeployTimeField + ): + mapping[1] = DeployTimeField( + "parameter_val", + str, + None, + mapping[1], + False, + ) + new_param_value[mapping[0]] = mapping[1] else: raise MetaflowException( @@ -133,6 +149,19 @@ def flow_init( "parameters", [list, dict, tuple], None, parameters, False ) self.attributes["event"]["parameters"] = new_param_value + + new_parameters = {} + for key, value in self.attributes["event"]["parameters"].items(): + if callable(key): + key = DeployTimeField("flow_parameter", str, None, key, False) + new_parameters[key] = value + if callable(value): + new_parameters[key] = DeployTimeField( + "signal_parameter", str, None, value, False + ) + + self.attributes["event"]["parameters"] = new_parameters + self.triggers.append(self.attributes["event"]) elif callable(self.attributes["event"]) and not isinstance( self.attributes["event"], DeployTimeField @@ -155,6 +184,7 @@ def flow_init( # {'name': 'table.prod_db.metadata', # 'parameters': {'beta': 'grade'}}] if isinstance(self.attributes["events"], list): + # process every event in events for event in self.attributes["events"]: if is_stringish(event): self.triggers.append({"name": str(event)}) @@ -172,23 +202,40 @@ def flow_init( ) event["name"] = new_name parameters = event.get("parameters", {}) + new_param_value = {} if isinstance(parameters, (list, tuple)): - new_param_value = {} for mapping in parameters: if is_stringish(mapping): new_param_value[mapping] = mapping + elif callable(mapping): + mapping = DeployTimeField( + "parameter_val", str, None, mapping, False + ) + new_param_value[mapping] = mapping elif ( isinstance(mapping, (list, tuple)) and len(mapping) == 2 ): + if callable(mapping[1]) and not isinstance( + mapping[1], DeployTimeField + ): + mapping[1] = DeployTimeField( + "parameter_val", + str, + None, + mapping[1], + False, + ) + new_param_value[mapping[0]] = mapping[1] - else: - raise MetaflowException( - "The *parameters* attribute for event '%s' is " - "invalid. It should be a list/tuple of strings " - "and lists/tuples of size 2" % event["name"] - ) - event["parameters"] = new_param_value + event["parameters"] = new_param_value + else: + raise MetaflowException( + "The *parameters* attribute for event '%s' is " + "invalid. It should be a list/tuple of strings " + "and lists/tuples of size 2" % event["name"] + ) + elif callable(parameters) and not isinstance( parameters, DeployTimeField ): @@ -200,12 +247,29 @@ def flow_init( False, ) event["parameters"] = new_param_value + new_parameters = {} + for key, value in event["parameters"].items(): + neither_changed = True + if callable(key): + key = DeployTimeField( + "flow_parameter", str, None, key, False + ) + new_parameters[key] = value + neither_changed = False + if callable(value): + new_parameters[key] = DeployTimeField( + "signal_parameter", str, None, value, False + ) + neither_changed = False + if neither_changed: + new_parameters[key] = value + + event["parameters"] = new_parameters self.triggers.append(event) elif callable(event) and not isinstance(event, DeployTimeField): - trig = DeployTimeField( - "event", [str, dict], None, self.attributes["event"], False - ) + trig = DeployTimeField("event", [str, dict], None, event, False) self.triggers.append(trig) + else: raise MetaflowException( "One or more events in *events* attribute in *@trigger* " @@ -282,9 +346,25 @@ def format_deploytime_value(self): if isinstance(trigger_params, (list, tuple)): new_trigger_params = {} for mapping in trigger_params: - if is_stringish(mapping): + if is_stringish(mapping) or callable(mapping): + new_trigger_params[mapping] = mapping + elif callable(mapping): + mapping = DeployTimeField( + "parameter_val", str, None, mapping, False + ) new_trigger_params[mapping] = mapping elif isinstance(mapping, (list, tuple)) and len(mapping) == 2: + if callable(mapping[1]) and not isinstance( + mapping[1], DeployTimeField + ): + mapping[1] = DeployTimeField( + "parameter_val", + str, + None, + mapping[1], + False, + ) + new_trigger_params[mapping[0]] = mapping[1] else: raise MetaflowException( @@ -301,8 +381,27 @@ def format_deploytime_value(self): trigger_name = deploy_time_eval(trigger_name) trigger["name"] = trigger_name # Replace old trigger with new trigger - # TODO might need third layer + + # Third layer + # {name:, parameters:[func, ..., ...]} + # {name:, parameters:{func : func2}} + for trigger in self.triggers: + old_trigger = trigger + trigger_params = trigger.get("parameters", {}) + new_trigger_params = {} + for key, value in trigger_params.items(): + if isinstance(value, DeployTimeField) and key is value: + evaluated_param = deploy_time_eval(value) + new_trigger_params[evaluated_param] = evaluated_param + elif isinstance(value, DeployTimeField): + new_trigger_params[key] = deploy_time_eval(value) + elif isinstance(key, DeployTimeField): + new_trigger_params[deploy_time_eval(key)] = value + else: + new_trigger_params[key] = value + trigger["parameters"] = new_trigger_params self.triggers[self.triggers.index(old_trigger)] = trigger + print(self.triggers) class TriggerOnFinishDecorator(FlowDecorator): From a14197e20102f6d0c7d5f0bfd57d524ce74cefa4 Mon Sep 17 00:00:00 2001 From: KaylaSeeley <42901681+KaylaSeeley@users.noreply.github.com> Date: Mon, 18 Nov 2024 14:18:34 -0800 Subject: [PATCH 15/25] Delete local config file --- ' | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 ' diff --git a/' b/' deleted file mode 100644 index 75581630898..00000000000 --- a/' +++ /dev/null @@ -1,16 +0,0 @@ -[core] - repositoryformatversion = 0 - filemode = true - bare = false - logallrefupdates = true -[remote "origin"] - url = git@github.com:Netflix/metaflow.git - fetch = +refs/heads/*:refs/remotes/origin/* -[branch "master"] - remote = origin - merge = refs/heads/master - vscode-merge-base = origin/master -[branch "deploy_time_triggers"] - vscode-merge-base = origin/master -[branch "temp"] - vscode-merge-base = origin/master From 10b38cdd550ee821c23adb54d8a9019a68e68051 Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Mon, 18 Nov 2024 20:01:49 +0000 Subject: [PATCH 16/25] fixing borked cases --- metaflow/flowspec.py | 5 ----- metaflow/plugins/events_decorator.py | 25 +++++++++++++------------ 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/metaflow/flowspec.py b/metaflow/flowspec.py index ec072f1f673..158dc7db58a 100644 --- a/metaflow/flowspec.py +++ b/metaflow/flowspec.py @@ -167,7 +167,6 @@ def _set_constants(self, graph, kwargs): seen.add(norm) seen.clear() self._success = True - parameters_info = [] for var, param in self._get_parameters(): seen.add(var) @@ -193,10 +192,6 @@ def _set_constants(self, graph, kwargs): setattr(self, var, val) # We store the DAG information as an artifact called _graph_info - for deco in flow_decorators(self): - if deco.name == "trigger": - for trig in deco.triggers: - print(trig) steps_info, graph_structure = graph.output_steps() graph_info = { "file": os.path.basename(os.path.abspath(sys.argv[0])), diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index 65eea0eeabe..78546844b69 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -150,17 +150,20 @@ def flow_init( ) self.attributes["event"]["parameters"] = new_param_value - new_parameters = {} - for key, value in self.attributes["event"]["parameters"].items(): - if callable(key): - key = DeployTimeField("flow_parameter", str, None, key, False) - new_parameters[key] = value - if callable(value): - new_parameters[key] = DeployTimeField( - "signal_parameter", str, None, value, False - ) + elif isinstance(self.attributes["event"]["parameters"], dict): + new_parameters = {} + for key, value in self.attributes["event"]["parameters"].items(): + if callable(key) and not isinstance(key, DeployTimeField): + key = DeployTimeField( + "flow_parameter", str, None, key, False + ) + new_parameters[key] = value + if callable(value) and not isinstance(value, DeployTimeField): + new_parameters[key] = DeployTimeField( + "signal_parameter", str, None, value, False + ) - self.attributes["event"]["parameters"] = new_parameters + self.attributes["event"]["parameters"] = new_parameters self.triggers.append(self.attributes["event"]) elif callable(self.attributes["event"]) and not isinstance( @@ -380,7 +383,6 @@ def format_deploytime_value(self): if isinstance(trigger_name, DeployTimeField): trigger_name = deploy_time_eval(trigger_name) trigger["name"] = trigger_name - # Replace old trigger with new trigger # Third layer # {name:, parameters:[func, ..., ...]} @@ -401,7 +403,6 @@ def format_deploytime_value(self): new_trigger_params[key] = value trigger["parameters"] = new_trigger_params self.triggers[self.triggers.index(old_trigger)] = trigger - print(self.triggers) class TriggerOnFinishDecorator(FlowDecorator): From 4d14b398e2ec0e38fb877fcc3385d7517568aad9 Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Tue, 19 Nov 2024 02:48:35 +0000 Subject: [PATCH 17/25] refactor code --- .../v3_5/importlib_metadata/_compat.py | 1 + metaflow/plugins/events_decorator.py | 278 ++++++------------ 2 files changed, 90 insertions(+), 189 deletions(-) diff --git a/metaflow/_vendor/v3_5/importlib_metadata/_compat.py b/metaflow/_vendor/v3_5/importlib_metadata/_compat.py index dda9c1e4e12..303d4a22e85 100644 --- a/metaflow/_vendor/v3_5/importlib_metadata/_compat.py +++ b/metaflow/_vendor/v3_5/importlib_metadata/_compat.py @@ -117,6 +117,7 @@ def py2_message_from_string(text): # nocoverpy3 email.message_from_string ) + class PyPy_repr: """ Override repr for EntryPoint objects on PyPy to avoid __iter__ access. diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index 78546844b69..e20da431215 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -70,6 +70,75 @@ class TriggerDecorator(FlowDecorator): "options": {}, } + def process_event_name(self, event): + if is_stringish(event): + return {"name": str(event)} + elif isinstance(event, dict): + if "name" not in event: + raise MetaflowException( + "The *event* attribute for *@trigger* is missing the *name* key." + ) + if callable(event["name"]) and not isinstance( + event["name"], DeployTimeField + ): + event["name"] = DeployTimeField( + "event_name", str, None, event["name"], False + ) + event["parameters"] = self.process_parameters(event.get("parameters", {})) + return event + elif callable(event) and not isinstance(event, DeployTimeField): + return DeployTimeField("event", [str, dict], None, event, False) + else: + raise MetaflowException( + "Incorrect format for *event* attribute in *@trigger* decorator. " + "Supported formats are string and dictionary - \n" + "@trigger(event='foo') or @trigger(event={'name': 'foo', " + "'parameters': {'alpha': 'beta'}})" + ) + + def process_parameters(self, parameters): + new_param_values = {} + if isinstance(parameters, (list, tuple)): + for mapping in parameters: + if is_stringish(mapping): + new_param_values[mapping] = mapping + elif callable(mapping) and not isinstance(mapping, DeployTimeField): + mapping = DeployTimeField( + "parameter_val", str, None, mapping, False + ) + new_param_values[mapping] = mapping + elif isinstance(mapping, (list, tuple)) and len(mapping) == 2: + if callable(mapping[0]) and not isinstance( + mapping[0], DeployTimeField + ): + mapping[0] = DeployTimeField( + "parameter_val", str, None, mapping[0], False + ) + if callable(mapping[1]) and not isinstance( + mapping[1], DeployTimeField + ): + mapping[1] = DeployTimeField( + "parameter_val", str, None, mapping[1], False + ) + new_param_values[mapping[0]] = mapping[1] + else: + raise MetaflowException( + "The *parameters* attribute for event is invalid. " + "It should be a list/tuple of strings and lists/tuples of size 2" + ) + elif callable(parameters) and not isinstance(parameters, DeployTimeField): + return DeployTimeField( + "parameters", [list, dict, tuple], None, parameters, False + ) + elif isinstance(parameters, dict): + for key, value in parameters.items(): + if callable(key) and not isinstance(key, DeployTimeField): + key = DeployTimeField("flow_parameter", str, None, key, False) + if callable(value) and not isinstance(value, DeployTimeField): + value = DeployTimeField("signal_parameter", str, None, value, False) + new_param_values[key] = value + return new_param_values + def flow_init( self, flow_name, @@ -88,98 +157,9 @@ def flow_init( "attributes in *@trigger* decorator." ) elif self.attributes["event"]: - # event attribute supports the following formats - - # 1. event='table.prod_db.members' - # 2. event={'name': 'table.prod_db.members', - # 'parameters': {'alpha': 'member_weight'}} - if is_stringish(self.attributes["event"]): - self.triggers.append({"name": str(self.attributes["event"])}) - elif isinstance(self.attributes["event"], dict): - if "name" not in self.attributes["event"]: - raise MetaflowException( - "The *event* attribute for *@trigger* is missing the " - "*name* key." - ) - elif callable(self.attributes["event"]["name"]) and not isinstance( - self.attributes["event"]["name"], DeployTimeField - ): - new_name = DeployTimeField( - "event_name", - str, - None, - self.attributes["event"]["name"], - False, - ) - self.attributes["event"]["name"] = new_name - parameters = self.attributes["event"].get("parameters", {}) - if isinstance(parameters, (list, tuple)): - new_param_value = {} - for mapping in parameters: - if is_stringish(mapping): - new_param_value[mapping] = mapping - elif callable(mapping): - mapping = DeployTimeField( - "parameter_val", str, None, mapping, False - ) - new_param_value[mapping] = mapping - elif isinstance(mapping, (list, tuple)) and len(mapping) == 2: - if callable(mapping[1]) and not isinstance( - mapping[1], DeployTimeField - ): - mapping[1] = DeployTimeField( - "parameter_val", - str, - None, - mapping[1], - False, - ) - - new_param_value[mapping[0]] = mapping[1] - else: - raise MetaflowException( - "The *parameters* attribute for event '%s' is invalid. " - "It should be a list/tuple of strings and lists/tuples " - "of size 2" % self.attributes["event"]["name"] - ) - self.attributes["event"]["parameters"] = new_param_value - elif callable(parameters) and not isinstance( # can be list or dict - parameters, DeployTimeField - ): - new_param_value = DeployTimeField( - "parameters", [list, dict, tuple], None, parameters, False - ) - self.attributes["event"]["parameters"] = new_param_value - - elif isinstance(self.attributes["event"]["parameters"], dict): - new_parameters = {} - for key, value in self.attributes["event"]["parameters"].items(): - if callable(key) and not isinstance(key, DeployTimeField): - key = DeployTimeField( - "flow_parameter", str, None, key, False - ) - new_parameters[key] = value - if callable(value) and not isinstance(value, DeployTimeField): - new_parameters[key] = DeployTimeField( - "signal_parameter", str, None, value, False - ) - - self.attributes["event"]["parameters"] = new_parameters - - self.triggers.append(self.attributes["event"]) - elif callable(self.attributes["event"]) and not isinstance( - self.attributes["event"], DeployTimeField - ): - trig = DeployTimeField( - "event", [str, dict], None, self.attributes["event"], False - ) - self.triggers.append(trig) - else: - raise MetaflowException( - "Incorrect format for *event* attribute in *@trigger* decorator. " - "Supported formats are string and dictionary - \n" - "@trigger(event='foo') or @trigger(event={'name': 'foo', " - "'parameters': {'alpha': 'beta'}})" - ) + event = self.attributes["event"] + processed_event = self.process_event_name(event) + self.triggers.append(processed_event) elif self.attributes["events"]: # events attribute supports the following formats - # 1. events=[{'name': 'table.prod_db.members', @@ -189,99 +169,8 @@ def flow_init( if isinstance(self.attributes["events"], list): # process every event in events for event in self.attributes["events"]: - if is_stringish(event): - self.triggers.append({"name": str(event)}) - elif isinstance(event, dict): - if "name" not in event: - raise MetaflowException( - "One or more events in *events* attribute for " - "*@trigger* are missing the *name* key." - ) - elif callable(event["name"]) and not isinstance( - event["name"], DeployTimeField - ): - new_name = DeployTimeField( - "event_name", str, None, event["name"], False - ) - event["name"] = new_name - parameters = event.get("parameters", {}) - new_param_value = {} - if isinstance(parameters, (list, tuple)): - for mapping in parameters: - if is_stringish(mapping): - new_param_value[mapping] = mapping - elif callable(mapping): - mapping = DeployTimeField( - "parameter_val", str, None, mapping, False - ) - new_param_value[mapping] = mapping - elif ( - isinstance(mapping, (list, tuple)) - and len(mapping) == 2 - ): - if callable(mapping[1]) and not isinstance( - mapping[1], DeployTimeField - ): - mapping[1] = DeployTimeField( - "parameter_val", - str, - None, - mapping[1], - False, - ) - - new_param_value[mapping[0]] = mapping[1] - event["parameters"] = new_param_value - else: - raise MetaflowException( - "The *parameters* attribute for event '%s' is " - "invalid. It should be a list/tuple of strings " - "and lists/tuples of size 2" % event["name"] - ) - - elif callable(parameters) and not isinstance( - parameters, DeployTimeField - ): - new_param_value = DeployTimeField( - "parameters", - [list, dict, tuple], - None, - parameters, - False, - ) - event["parameters"] = new_param_value - new_parameters = {} - for key, value in event["parameters"].items(): - neither_changed = True - if callable(key): - key = DeployTimeField( - "flow_parameter", str, None, key, False - ) - new_parameters[key] = value - neither_changed = False - if callable(value): - new_parameters[key] = DeployTimeField( - "signal_parameter", str, None, value, False - ) - neither_changed = False - if neither_changed: - new_parameters[key] = value - - event["parameters"] = new_parameters - self.triggers.append(event) - elif callable(event) and not isinstance(event, DeployTimeField): - trig = DeployTimeField("event", [str, dict], None, event, False) - self.triggers.append(trig) - - else: - raise MetaflowException( - "One or more events in *events* attribute in *@trigger* " - "decorator have an incorrect format. Supported format " - "is dictionary - \n" - "@trigger(events=[{'name': 'foo', 'parameters': {'alpha': " - "'beta'}}, {'name': 'bar', 'parameters': " - "{'gamma': 'kappa'}}])" - ) + processed_event = self.process_event_name(event) + self.triggers.append("processed event", processed_event) elif callable(self.attributes["events"]) and not isinstance( self.attributes["events"], DeployTimeField ): @@ -318,18 +207,18 @@ def flow_init( # TODO: Handle scenario for local testing using --trigger. def format_deploytime_value(self): + new_triggers = self.triggers for trigger in self.triggers: - new_triggers = self.triggers # Case where trigger is a function that returns a list of events # Need to do this bc we need to iterate over list later if isinstance(trigger, DeployTimeField): evaluated_trigger = deploy_time_eval(trigger) - old_trigger = trigger + old_trig = trigger if isinstance(evaluated_trigger, dict): trigger = evaluated_trigger elif isinstance(evaluated_trigger, str): trigger = {"name": evaluated_trigger} - new_triggers.remove(old_trigger) + new_triggers.remove(old_trig) if isinstance(evaluated_trigger, list): if all(is_stringish(event) for event in evaluated_trigger): new_triggers.extend( @@ -351,12 +240,22 @@ def format_deploytime_value(self): for mapping in trigger_params: if is_stringish(mapping) or callable(mapping): new_trigger_params[mapping] = mapping - elif callable(mapping): + elif callable(mapping) and not isinstance(mapping, DeployTimeField): mapping = DeployTimeField( "parameter_val", str, None, mapping, False ) new_trigger_params[mapping] = mapping elif isinstance(mapping, (list, tuple)) and len(mapping) == 2: + if callable(mapping[0]) and not isinstance( + mapping[0], DeployTimeField + ): + mapping[0] = DeployTimeField( + "parameter_val", + str, + None, + mapping[1], + False, + ) if callable(mapping[1]) and not isinstance( mapping[1], DeployTimeField ): @@ -403,6 +302,7 @@ def format_deploytime_value(self): new_trigger_params[key] = value trigger["parameters"] = new_trigger_params self.triggers[self.triggers.index(old_trigger)] = trigger + print(self.triggers) class TriggerOnFinishDecorator(FlowDecorator): From 764105c0b1cbbede2dc638ffea1458f6fff753b9 Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Tue, 19 Nov 2024 02:51:09 +0000 Subject: [PATCH 18/25] shouldn't be changes to flowspec.py --- metaflow/flowspec.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metaflow/flowspec.py b/metaflow/flowspec.py index 158dc7db58a..0c7ffd1f128 100644 --- a/metaflow/flowspec.py +++ b/metaflow/flowspec.py @@ -167,6 +167,7 @@ def _set_constants(self, graph, kwargs): seen.add(norm) seen.clear() self._success = True + parameters_info = [] for var, param in self._get_parameters(): seen.add(var) @@ -193,6 +194,7 @@ def _set_constants(self, graph, kwargs): # We store the DAG information as an artifact called _graph_info steps_info, graph_structure = graph.output_steps() + graph_info = { "file": os.path.basename(os.path.abspath(sys.argv[0])), "parameters": parameters_info, From 3c2f536d73f6d4ea3bb945458e810b50d7160bf9 Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Wed, 20 Nov 2024 08:20:41 +0000 Subject: [PATCH 19/25] fixing bugs --- metaflow/plugins/events_decorator.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index e20da431215..d33f0f9f761 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -220,10 +220,13 @@ def format_deploytime_value(self): trigger = {"name": evaluated_trigger} new_triggers.remove(old_trig) if isinstance(evaluated_trigger, list): - if all(is_stringish(event) for event in evaluated_trigger): - new_triggers.extend( - {"name": event_name} for event_name in evaluated_trigger + for trig in evaluated_trigger: + if is_stringish(trig): + new_triggers.append( + {"name": trig} ) + else: #dict or another deploytimefield + new_triggers.append(trig) else: new_triggers.append(trigger) @@ -302,8 +305,6 @@ def format_deploytime_value(self): new_trigger_params[key] = value trigger["parameters"] = new_trigger_params self.triggers[self.triggers.index(old_trigger)] = trigger - print(self.triggers) - class TriggerOnFinishDecorator(FlowDecorator): """ @@ -599,7 +600,7 @@ def format_deploytime_value(self): if isinstance(trigger, DeployTimeField): trigger = deploy_time_eval(trigger) if isinstance(trigger, dict): - trigger["fq_name"] = trigger.get("name") + trigger["fq_name"] = trigger.get("name") trigger["project"] = trigger.get("project") trigger["branch"] = trigger.get("project_branch") # We also added this bc it won't be formatted yet From f4e3f26538ea440fd807ac8ea73f273b8878783e Mon Sep 17 00:00:00 2001 From: Sakari Ikonen <64256562+saikonen@users.noreply.github.com> Date: Wed, 20 Nov 2024 10:28:14 +0200 Subject: [PATCH 20/25] add deploy time trigger inits to argo parameter handling (#2146) --- metaflow/plugins/argo/argo_workflows.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index c4e8cbd6c77..05371eeca69 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -522,7 +522,9 @@ def _process_triggers(self): params = set( [param.name.lower() for var, param in self.flow._get_parameters()] ) - for event in self.flow._flow_decorators.get("trigger")[0].triggers: + trigger_deco = self.flow._flow_decorators.get("trigger")[0] + trigger_deco.format_deploytime_value() + for event in trigger_deco.triggers: parameters = {} # TODO: Add a check to guard against names starting with numerals(?) if not re.match(r"^[A-Za-z0-9_.-]+$", event["name"]): @@ -562,9 +564,11 @@ def _process_triggers(self): # @trigger_on_finish decorator if self.flow._flow_decorators.get("trigger_on_finish"): - for event in self.flow._flow_decorators.get("trigger_on_finish")[ - 0 - ].triggers: + trigger_on_finish_deco = self.flow._flow_decorators.get( + "trigger_on_finish" + )[0] + trigger_on_finish_deco.format_deploytime_value() + for event in trigger_on_finish_deco.triggers: # Actual filters are deduced here since we don't have access to # the current object in the @trigger_on_finish decorator. triggers.append( From d7bae9f1f7a21043e97536de8bcb0544eda24184 Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Wed, 20 Nov 2024 18:03:30 +0000 Subject: [PATCH 21/25] run black --- metaflow/plugins/events_decorator.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index d33f0f9f761..f0697c1be4a 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -222,10 +222,8 @@ def format_deploytime_value(self): if isinstance(evaluated_trigger, list): for trig in evaluated_trigger: if is_stringish(trig): - new_triggers.append( - {"name": trig} - ) - else: #dict or another deploytimefield + new_triggers.append({"name": trig}) + else: # dict or another deploytimefield new_triggers.append(trig) else: new_triggers.append(trigger) @@ -306,6 +304,7 @@ def format_deploytime_value(self): trigger["parameters"] = new_trigger_params self.triggers[self.triggers.index(old_trigger)] = trigger + class TriggerOnFinishDecorator(FlowDecorator): """ Specifies the flow(s) that this flow depends on. @@ -600,7 +599,7 @@ def format_deploytime_value(self): if isinstance(trigger, DeployTimeField): trigger = deploy_time_eval(trigger) if isinstance(trigger, dict): - trigger["fq_name"] = trigger.get("name") + trigger["fq_name"] = trigger.get("name") trigger["project"] = trigger.get("project") trigger["branch"] = trigger.get("project_branch") # We also added this bc it won't be formatted yet From aee1b5796124a752e9f05a284086cfa50efe85ee Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Wed, 20 Nov 2024 22:54:19 +0000 Subject: [PATCH 22/25] undo modifications to to_pod --- metaflow/util.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/metaflow/util.py b/metaflow/util.py index 885f0e9846f..ceed45ab750 100644 --- a/metaflow/util.py +++ b/metaflow/util.py @@ -445,11 +445,6 @@ def to_pod(value): return {to_pod(k): to_pod(v) for k, v in value.items()} if isinstance(value, (list, set, tuple)): return [to_pod(v) for v in value] - if isinstance(value, DeployTimeField): - - return None - # return to_pod(deploy_time_eval(value)) - return str(value) From ecd5cd82f7a17d2939e5334021187e3acc6a20a0 Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Thu, 21 Nov 2024 07:37:46 +0000 Subject: [PATCH 23/25] reset util file --- metaflow/util.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/metaflow/util.py b/metaflow/util.py index ceed45ab750..e95ab9f0f87 100644 --- a/metaflow/util.py +++ b/metaflow/util.py @@ -11,7 +11,6 @@ from metaflow.exception import MetaflowUnknownUser, MetaflowInternalError - try: # python2 unicode_type = unicode @@ -437,8 +436,6 @@ def to_pod(value): Value to convert to POD format. The value can be a string, number, list, dictionary, or a nested structure of these types. """ - from metaflow.parameters import DeployTimeField, deploy_time_eval - if isinstance(value, (str, int, float)): return value if isinstance(value, dict): From f8def01698b6b263233a4aabde23f4b3ec97a5e9 Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Thu, 21 Nov 2024 20:17:39 +0000 Subject: [PATCH 24/25] pr comment --- metaflow/plugins/events_decorator.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index f0697c1be4a..7846393204f 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -207,18 +207,16 @@ def flow_init( # TODO: Handle scenario for local testing using --trigger. def format_deploytime_value(self): - new_triggers = self.triggers + new_triggers = [] for trigger in self.triggers: # Case where trigger is a function that returns a list of events # Need to do this bc we need to iterate over list later if isinstance(trigger, DeployTimeField): evaluated_trigger = deploy_time_eval(trigger) - old_trig = trigger if isinstance(evaluated_trigger, dict): trigger = evaluated_trigger elif isinstance(evaluated_trigger, str): trigger = {"name": evaluated_trigger} - new_triggers.remove(old_trig) if isinstance(evaluated_trigger, list): for trig in evaluated_trigger: if is_stringish(trig): @@ -227,6 +225,8 @@ def format_deploytime_value(self): new_triggers.append(trig) else: new_triggers.append(trigger) + else: + new_triggers.append(trigger) self.triggers = new_triggers for trigger in self.triggers: @@ -303,6 +303,7 @@ def format_deploytime_value(self): new_trigger_params[key] = value trigger["parameters"] = new_trigger_params self.triggers[self.triggers.index(old_trigger)] = trigger + print(self.triggers) class TriggerOnFinishDecorator(FlowDecorator): From b88387d1fbb0c28e818ad5b7bc5ea72ae0f7948c Mon Sep 17 00:00:00 2001 From: KaylaSeeley Date: Fri, 22 Nov 2024 01:51:11 +0000 Subject: [PATCH 25/25] remove print --- metaflow/plugins/events_decorator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index 7846393204f..c9090f547fb 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -303,7 +303,6 @@ def format_deploytime_value(self): new_trigger_params[key] = value trigger["parameters"] = new_trigger_params self.triggers[self.triggers.index(old_trigger)] = trigger - print(self.triggers) class TriggerOnFinishDecorator(FlowDecorator):