From 252a272505d832ca90e3707836e31448907aa2a9 Mon Sep 17 00:00:00 2001 From: Savin Date: Wed, 27 Nov 2024 10:46:10 -0800 Subject: [PATCH] Revert "Deploy time triggers (#2133)" This reverts commit 50298d70b4e3be4e480bec08f5e9bdb2a7eef1eb. --- metaflow/parameters.py | 10 +- metaflow/plugins/argo/argo_workflows.py | 12 +- metaflow/plugins/events_decorator.py | 325 ++++++------------------ 3 files changed, 78 insertions(+), 269 deletions(-) diff --git a/metaflow/parameters.py b/metaflow/parameters.py index e5778e6cd1e..fe0dabbda3f 100644 --- a/metaflow/parameters.py +++ b/metaflow/parameters.py @@ -151,7 +151,6 @@ def __call__(self, deploy_time=False): return self._check_type(val, deploy_time) def _check_type(self, val, deploy_time): - # it is easy to introduce a deploy-time function that accidentally # returns a value whose type is not compatible with what is defined # in Parameter. Let's catch those mistakes early here, instead of @@ -159,7 +158,7 @@ def _check_type(self, val, deploy_time): # note: this doesn't work with long in Python2 or types defined as # click types, e.g. click.INT - TYPES = {bool: "bool", int: "int", float: "float", list: "list", dict: "dict"} + TYPES = {bool: "bool", int: "int", float: "float", list: "list"} msg = ( "The value returned by the deploy-time function for " @@ -167,12 +166,7 @@ def _check_type(self, val, deploy_time): % (self.parameter_name, self.field) ) - if isinstance(self.parameter_type, list): - if not any(isinstance(val, x) for x in self.parameter_type): - msg += "Expected one of the following %s." % TYPES[self.parameter_type] - raise ParameterFieldTypeMismatch(msg) - return str(val) if self.return_str else val - elif self.parameter_type in TYPES: + if self.parameter_type in TYPES: if type(val) != self.parameter_type: msg += "Expected a %s." % TYPES[self.parameter_type] raise ParameterFieldTypeMismatch(msg) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 05371eeca69..c4e8cbd6c77 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -522,9 +522,7 @@ def _process_triggers(self): params = set( [param.name.lower() for var, param in self.flow._get_parameters()] ) - trigger_deco = self.flow._flow_decorators.get("trigger")[0] - trigger_deco.format_deploytime_value() - for event in trigger_deco.triggers: + for event in self.flow._flow_decorators.get("trigger")[0].triggers: parameters = {} # TODO: Add a check to guard against names starting with numerals(?) if not re.match(r"^[A-Za-z0-9_.-]+$", event["name"]): @@ -564,11 +562,9 @@ def _process_triggers(self): # @trigger_on_finish decorator if self.flow._flow_decorators.get("trigger_on_finish"): - trigger_on_finish_deco = self.flow._flow_decorators.get( - "trigger_on_finish" - )[0] - trigger_on_finish_deco.format_deploytime_value() - for event in trigger_on_finish_deco.triggers: + for event in self.flow._flow_decorators.get("trigger_on_finish")[ + 0 + ].triggers: # Actual filters are deduced here since we don't have access to # the current object in the @trigger_on_finish decorator. triggers.append( diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index c9090f547fb..baa6320b0ba 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -1,11 +1,9 @@ import re -import json from metaflow import current from metaflow.decorators import FlowDecorator from metaflow.exception import MetaflowException from metaflow.util import is_stringish -from metaflow.parameters import DeployTimeField, deploy_time_eval # TODO: Support dynamic parameter mapping through a context object that exposes # flow name and user name similar to parameter context @@ -70,75 +68,6 @@ class TriggerDecorator(FlowDecorator): "options": {}, } - def process_event_name(self, event): - if is_stringish(event): - return {"name": str(event)} - elif isinstance(event, dict): - if "name" not in event: - raise MetaflowException( - "The *event* attribute for *@trigger* is missing the *name* key." - ) - if callable(event["name"]) and not isinstance( - event["name"], DeployTimeField - ): - event["name"] = DeployTimeField( - "event_name", str, None, event["name"], False - ) - event["parameters"] = self.process_parameters(event.get("parameters", {})) - return event - elif callable(event) and not isinstance(event, DeployTimeField): - return DeployTimeField("event", [str, dict], None, event, False) - else: - raise MetaflowException( - "Incorrect format for *event* attribute in *@trigger* decorator. " - "Supported formats are string and dictionary - \n" - "@trigger(event='foo') or @trigger(event={'name': 'foo', " - "'parameters': {'alpha': 'beta'}})" - ) - - def process_parameters(self, parameters): - new_param_values = {} - if isinstance(parameters, (list, tuple)): - for mapping in parameters: - if is_stringish(mapping): - new_param_values[mapping] = mapping - elif callable(mapping) and not isinstance(mapping, DeployTimeField): - mapping = DeployTimeField( - "parameter_val", str, None, mapping, False - ) - new_param_values[mapping] = mapping - elif isinstance(mapping, (list, tuple)) and len(mapping) == 2: - if callable(mapping[0]) and not isinstance( - mapping[0], DeployTimeField - ): - mapping[0] = DeployTimeField( - "parameter_val", str, None, mapping[0], False - ) - if callable(mapping[1]) and not isinstance( - mapping[1], DeployTimeField - ): - mapping[1] = DeployTimeField( - "parameter_val", str, None, mapping[1], False - ) - new_param_values[mapping[0]] = mapping[1] - else: - raise MetaflowException( - "The *parameters* attribute for event is invalid. " - "It should be a list/tuple of strings and lists/tuples of size 2" - ) - elif callable(parameters) and not isinstance(parameters, DeployTimeField): - return DeployTimeField( - "parameters", [list, dict, tuple], None, parameters, False - ) - elif isinstance(parameters, dict): - for key, value in parameters.items(): - if callable(key) and not isinstance(key, DeployTimeField): - key = DeployTimeField("flow_parameter", str, None, key, False) - if callable(value) and not isinstance(value, DeployTimeField): - value = DeployTimeField("signal_parameter", str, None, value, False) - new_param_values[key] = value - return new_param_values - def flow_init( self, flow_name, @@ -157,9 +86,41 @@ def flow_init( "attributes in *@trigger* decorator." ) elif self.attributes["event"]: - event = self.attributes["event"] - processed_event = self.process_event_name(event) - self.triggers.append(processed_event) + # event attribute supports the following formats - + # 1. event='table.prod_db.members' + # 2. event={'name': 'table.prod_db.members', + # 'parameters': {'alpha': 'member_weight'}} + if is_stringish(self.attributes["event"]): + self.triggers.append({"name": str(self.attributes["event"])}) + elif isinstance(self.attributes["event"], dict): + if "name" not in self.attributes["event"]: + raise MetaflowException( + "The *event* attribute for *@trigger* is missing the " + "*name* key." + ) + param_value = self.attributes["event"].get("parameters", {}) + if isinstance(param_value, (list, tuple)): + new_param_value = {} + for mapping in param_value: + if is_stringish(mapping): + new_param_value[mapping] = mapping + elif isinstance(mapping, (list, tuple)) and len(mapping) == 2: + new_param_value[mapping[0]] = mapping[1] + else: + raise MetaflowException( + "The *parameters* attribute for event '%s' is invalid. " + "It should be a list/tuple of strings and lists/tuples " + "of size 2" % self.attributes["event"]["name"] + ) + self.attributes["event"]["parameters"] = new_param_value + self.triggers.append(self.attributes["event"]) + else: + raise MetaflowException( + "Incorrect format for *event* attribute in *@trigger* decorator. " + "Supported formats are string and dictionary - \n" + "@trigger(event='foo') or @trigger(event={'name': 'foo', " + "'parameters': {'alpha': 'beta'}})" + ) elif self.attributes["events"]: # events attribute supports the following formats - # 1. events=[{'name': 'table.prod_db.members', @@ -167,17 +128,43 @@ def flow_init( # {'name': 'table.prod_db.metadata', # 'parameters': {'beta': 'grade'}}] if isinstance(self.attributes["events"], list): - # process every event in events for event in self.attributes["events"]: - processed_event = self.process_event_name(event) - self.triggers.append("processed event", processed_event) - elif callable(self.attributes["events"]) and not isinstance( - self.attributes["events"], DeployTimeField - ): - trig = DeployTimeField( - "events", list, None, self.attributes["events"], False - ) - self.triggers.append(trig) + if is_stringish(event): + self.triggers.append({"name": str(event)}) + elif isinstance(event, dict): + if "name" not in event: + raise MetaflowException( + "One or more events in *events* attribute for " + "*@trigger* are missing the *name* key." + ) + param_value = event.get("parameters", {}) + if isinstance(param_value, (list, tuple)): + new_param_value = {} + for mapping in param_value: + if is_stringish(mapping): + new_param_value[mapping] = mapping + elif ( + isinstance(mapping, (list, tuple)) + and len(mapping) == 2 + ): + new_param_value[mapping[0]] = mapping[1] + else: + raise MetaflowException( + "The *parameters* attribute for event '%s' is " + "invalid. It should be a list/tuple of strings " + "and lists/tuples of size 2" % event["name"] + ) + event["parameters"] = new_param_value + self.triggers.append(event) + else: + raise MetaflowException( + "One or more events in *events* attribute in *@trigger* " + "decorator have an incorrect format. Supported format " + "is dictionary - \n" + "@trigger(events=[{'name': 'foo', 'parameters': {'alpha': " + "'beta'}}, {'name': 'bar', 'parameters': " + "{'gamma': 'kappa'}}])" + ) else: raise MetaflowException( "Incorrect format for *events* attribute in *@trigger* decorator. " @@ -191,12 +178,7 @@ def flow_init( raise MetaflowException("No event(s) specified in *@trigger* decorator.") # same event shouldn't occur more than once - names = [ - x["name"] - for x in self.triggers - if not isinstance(x, DeployTimeField) - and not isinstance(x["name"], DeployTimeField) - ] + names = [x["name"] for x in self.triggers] if len(names) != len(set(names)): raise MetaflowException( "Duplicate event names defined in *@trigger* decorator." @@ -206,104 +188,6 @@ def flow_init( # TODO: Handle scenario for local testing using --trigger. - def format_deploytime_value(self): - new_triggers = [] - for trigger in self.triggers: - # Case where trigger is a function that returns a list of events - # Need to do this bc we need to iterate over list later - if isinstance(trigger, DeployTimeField): - evaluated_trigger = deploy_time_eval(trigger) - if isinstance(evaluated_trigger, dict): - trigger = evaluated_trigger - elif isinstance(evaluated_trigger, str): - trigger = {"name": evaluated_trigger} - if isinstance(evaluated_trigger, list): - for trig in evaluated_trigger: - if is_stringish(trig): - new_triggers.append({"name": trig}) - else: # dict or another deploytimefield - new_triggers.append(trig) - else: - new_triggers.append(trigger) - else: - new_triggers.append(trigger) - - self.triggers = new_triggers - for trigger in self.triggers: - old_trigger = trigger - trigger_params = trigger.get("parameters", {}) - # Case where param is a function (can return list or dict) - if isinstance(trigger_params, DeployTimeField): - trigger_params = deploy_time_eval(trigger_params) - # If params is a list of strings, convert to dict with same key and value - if isinstance(trigger_params, (list, tuple)): - new_trigger_params = {} - for mapping in trigger_params: - if is_stringish(mapping) or callable(mapping): - new_trigger_params[mapping] = mapping - elif callable(mapping) and not isinstance(mapping, DeployTimeField): - mapping = DeployTimeField( - "parameter_val", str, None, mapping, False - ) - new_trigger_params[mapping] = mapping - elif isinstance(mapping, (list, tuple)) and len(mapping) == 2: - if callable(mapping[0]) and not isinstance( - mapping[0], DeployTimeField - ): - mapping[0] = DeployTimeField( - "parameter_val", - str, - None, - mapping[1], - False, - ) - if callable(mapping[1]) and not isinstance( - mapping[1], DeployTimeField - ): - mapping[1] = DeployTimeField( - "parameter_val", - str, - None, - mapping[1], - False, - ) - - new_trigger_params[mapping[0]] = mapping[1] - else: - raise MetaflowException( - "The *parameters* attribute for event '%s' is invalid. " - "It should be a list/tuple of strings and lists/tuples " - "of size 2" % self.attributes["event"]["name"] - ) - trigger_params = new_trigger_params - trigger["parameters"] = trigger_params - - trigger_name = trigger.get("name") - # Case where just the name is a function (always a str) - if isinstance(trigger_name, DeployTimeField): - trigger_name = deploy_time_eval(trigger_name) - trigger["name"] = trigger_name - - # Third layer - # {name:, parameters:[func, ..., ...]} - # {name:, parameters:{func : func2}} - for trigger in self.triggers: - old_trigger = trigger - trigger_params = trigger.get("parameters", {}) - new_trigger_params = {} - for key, value in trigger_params.items(): - if isinstance(value, DeployTimeField) and key is value: - evaluated_param = deploy_time_eval(value) - new_trigger_params[evaluated_param] = evaluated_param - elif isinstance(value, DeployTimeField): - new_trigger_params[key] = deploy_time_eval(value) - elif isinstance(key, DeployTimeField): - new_trigger_params[deploy_time_eval(key)] = value - else: - new_trigger_params[key] = value - trigger["parameters"] = new_trigger_params - self.triggers[self.triggers.index(old_trigger)] = trigger - class TriggerOnFinishDecorator(FlowDecorator): """ @@ -428,13 +312,6 @@ def flow_init( "The *project_branch* attribute of the *flow* is not a string" ) self.triggers.append(result) - elif callable(self.attributes["flow"]) and not isinstance( - self.attributes["flow"], DeployTimeField - ): - trig = DeployTimeField( - "fq_name", [str, dict], None, self.attributes["flow"], False - ) - self.triggers.append(trig) else: raise MetaflowException( "Incorrect type for *flow* attribute in *@trigger_on_finish* " @@ -492,13 +369,6 @@ def flow_init( "Supported type is string or Dict[str, str]- \n" "@trigger_on_finish(flows=['FooFlow', 'BarFlow']" ) - elif callable(self.attributes["flows"]) and not isinstance( - self.attributes["flows"], DeployTimeField - ): - trig = DeployTimeField( - "flows", list, None, self.attributes["flows"], False - ) - self.triggers.append(trig) else: raise MetaflowException( "Incorrect type for *flows* attribute in *@trigger_on_finish* " @@ -513,8 +383,6 @@ def flow_init( # Make triggers @project aware for trigger in self.triggers: - if isinstance(trigger, DeployTimeField): - continue if trigger["fq_name"].count(".") == 0: # fully qualified name is just the flow name trigger["flow"] = trigger["fq_name"] @@ -559,54 +427,5 @@ def flow_init( run_objs.append(run_obj) current._update_env({"trigger": Trigger.from_runs(run_objs)}) - def _parse_fq_name(self, trigger): - if isinstance(trigger, DeployTimeField): - trigger["fq_name"] = deploy_time_eval(trigger["fq_name"]) - if trigger["fq_name"].count(".") == 0: - # fully qualified name is just the flow name - trigger["flow"] = trigger["fq_name"] - elif trigger["fq_name"].count(".") >= 2: - # fully qualified name is of the format - project.branch.flow_name - trigger["project"], tail = trigger["fq_name"].split(".", maxsplit=1) - trigger["branch"], trigger["flow"] = tail.rsplit(".", maxsplit=1) - else: - raise MetaflowException( - "Incorrect format for *flow* in *@trigger_on_finish* " - "decorator. Specify either just the *flow_name* or a fully " - "qualified name like *project_name.branch_name.flow_name*." - ) - if not re.match(r"^[A-Za-z0-9_]+$", trigger["flow"]): - raise MetaflowException( - "Invalid flow name *%s* in *@trigger_on_finish* " - "decorator. Only alphanumeric characters and " - "underscores(_) are allowed." % trigger["flow"] - ) - return trigger - - def format_deploytime_value(self): - for trigger in self.triggers: - # Case were trigger is a function that returns a list - # Need to do this bc we need to iterate over list and process - if isinstance(trigger, DeployTimeField): - deploy_value = deploy_time_eval(trigger) - if isinstance(deploy_value, list): - self.triggers = deploy_value - else: - break - for trigger in self.triggers: - # Entire trigger is a function (returns either string or dict) - old_trig = trigger - if isinstance(trigger, DeployTimeField): - trigger = deploy_time_eval(trigger) - if isinstance(trigger, dict): - trigger["fq_name"] = trigger.get("name") - trigger["project"] = trigger.get("project") - trigger["branch"] = trigger.get("project_branch") - # We also added this bc it won't be formatted yet - if isinstance(trigger, str): - trigger = {"fq_name": trigger} - trigger = self._parse_fq_name(trigger) - self.triggers[self.triggers.index(old_trig)] = trigger - def get_top_level_options(self): return list(self._option_values.items())