diff --git a/metaflow/plugins/events_decorator.py b/metaflow/plugins/events_decorator.py index ac8309985e6..da6df04ec8b 100644 --- a/metaflow/plugins/events_decorator.py +++ b/metaflow/plugins/events_decorator.py @@ -70,7 +70,26 @@ class TriggerDecorator(FlowDecorator): "options": {}, } - def process_event_name(self, event): + def process_event(self, event): + """ + Process a single event and return a dictionary if static trigger and a function + if deploy-time trigger. + + Parameters + ---------- + event : Union[str, Dict[str, Any], Callable] + Event to process + + Returns + ------- + Union[Dict[str, Union[str, Callable]], Callable] + Processed event + + Raises + ------ + MetaflowException + If the event is not in the correct format + """ if is_stringish(event): return {"name": str(event)} elif isinstance(event, dict): @@ -82,12 +101,26 @@ def process_event_name(self, event): event["name"], DeployTimeField ): event["name"] = DeployTimeField( - "event_name", str, None, event["name"], False + "event_name", + str, + None, + event["name"], + False, + print_representation=str(event["name"]), ) - event["parameters"] = self.process_parameters(event.get("parameters", {})) + event["parameters"] = self.process_parameters( + event.get("parameters", {}), event["name"] + ) return event elif callable(event) and not isinstance(event, DeployTimeField): - return DeployTimeField("event", [str, dict], None, event, False) + return DeployTimeField( + "event", + [str, dict], + None, + event, + False, + print_representation=str(event), + ) else: raise MetaflowException( "Incorrect format for *event* attribute in *@trigger* decorator. " @@ -96,47 +129,69 @@ def process_event_name(self, event): "'parameters': {'alpha': 'beta'}})" ) - def process_parameters(self, parameters): + def process_parameters(self, parameters, event_name): + """ + Process the parameters for an event and return a dictionary of parameter mappings if + parameters was statically defined or a function if deploy-time trigger. + + Parameters + ---------- + Parameters : Union[callable, List[Union[str, Tuple[str, str]]], Dict[str, str]] + Parameters to process + + event_name : Union[str, callable] + Name of the event + + Returns + ------- + Union[Dict[Union[str, callable], Union[str, callable]], Callable] + Processed parameters + + Raises + ------ + MetaflowException + If the parameters are not in the correct format + """ new_param_values = {} - if isinstance(parameters, (list, tuple)): + if isinstance(parameters, list): + new_param_list = [] for mapping in parameters: if is_stringish(mapping): + # param_name new_param_values[mapping] = mapping - elif callable(mapping) and not isinstance(mapping, DeployTimeField): - mapping = DeployTimeField( - "parameter_val", str, None, mapping, False - ) - new_param_values[mapping] = mapping - elif isinstance(mapping, (list, tuple)) and len(mapping) == 2: - if callable(mapping[0]) and not isinstance( - mapping[0], DeployTimeField - ): - mapping[0] = DeployTimeField( - "parameter_val", str, None, mapping[0], False - ) - if callable(mapping[1]) and not isinstance( - mapping[1], DeployTimeField - ): - mapping[1] = DeployTimeField( - "parameter_val", str, None, mapping[1], False + elif isinstance(mapping, tuple) and len(mapping) == 2: + # (param_name, field_name) + param_name, field_name = mapping + if not is_stringish(param_name) or not is_stringish(field_name): + raise MetaflowException( + f"The *parameters* attribute for event {event_name} is invalid. " + "Only strings and tuples of size 2 are allowed." ) - new_param_values[mapping[0]] = mapping[1] + new_param_values[param_name] = field_name else: raise MetaflowException( "The *parameters* attribute for event is invalid. " "It should be a list/tuple of strings and lists/tuples of size 2" ) - elif callable(parameters) and not isinstance(parameters, DeployTimeField): - return DeployTimeField( - "parameters", [list, dict, tuple], None, parameters, False - ) elif isinstance(parameters, dict): + # {Union[func, param_name]: Union[func, param_name]} for key, value in parameters.items(): - if callable(key) and not isinstance(key, DeployTimeField): - key = DeployTimeField("flow_parameter", str, None, key, False) - if callable(value) and not isinstance(value, DeployTimeField): - value = DeployTimeField("signal_parameter", str, None, value, False) + if not is_stringish(key) or not is_stringish(value): + raise MetaflowException( + f"The *parameters* attribute for event {event_name} is invalid. " + "It should be a dictionary of strings." + ) new_param_values[key] = value + elif callable(parameters) and not isinstance(parameters, DeployTimeField): + # func + return DeployTimeField( + "parameters", + [list, dict, tuple], + None, + parameters, + False, + print_representation=str(parameters), + ) return new_param_values def flow_init( @@ -158,7 +213,7 @@ def flow_init( ) elif self.attributes["event"]: event = self.attributes["event"] - processed_event = self.process_event_name(event) + processed_event = self.process_event(event) self.triggers.append(processed_event) elif self.attributes["events"]: # events attribute supports the following formats - @@ -169,13 +224,18 @@ def flow_init( if isinstance(self.attributes["events"], list): # process every event in events for event in self.attributes["events"]: - processed_event = self.process_event_name(event) + processed_event = self.process_event(event) self.triggers.append(processed_event) elif callable(self.attributes["events"]) and not isinstance( self.attributes["events"], DeployTimeField ): trig = DeployTimeField( - "events", list, None, self.attributes["events"], False + "events", + list, + None, + self.attributes["events"], + False, + print_representation=str(self.attributes["events"]), ) self.triggers.append(trig) else: @@ -208,101 +268,40 @@ def flow_init( def format_deploytime_value(self): new_triggers = [] + + # First pass to evaluate DeployTimeFields for trigger in self.triggers: # Case where trigger is a function that returns a list of events # Need to do this bc we need to iterate over list later if isinstance(trigger, DeployTimeField): evaluated_trigger = deploy_time_eval(trigger) - if isinstance(evaluated_trigger, dict): - trigger = evaluated_trigger - elif isinstance(evaluated_trigger, str): - trigger = {"name": evaluated_trigger} if isinstance(evaluated_trigger, list): - for trig in evaluated_trigger: - if is_stringish(trig): - new_triggers.append({"name": trig}) - else: # dict or another deploytimefield - new_triggers.append(trig) + for event in evaluated_trigger: + new_triggers.append(self.process_event(event)) else: - new_triggers.append(trigger) + new_triggers.append(self.process_event(evaluated_trigger)) else: new_triggers.append(trigger) + # Second pass to evaluate names + for trigger in new_triggers: + name = trigger.get("name") + if isinstance(name, DeployTimeField): + trigger["name"] = deploy_time_eval(name) + if not is_stringish(trigger["name"]): + raise MetaflowException( + f"The *name* attribute for event {trigger} is not a valid string" + ) + + # third pass to evaluate parameters + for trigger in new_triggers: + parameters = trigger.get("parameters", {}) + if isinstance(parameters, DeployTimeField): + parameters_eval = deploy_time_eval(parameters) + parameters = self.process_parameters(parameters_eval, trigger["name"]) + trigger["parameters"] = parameters + self.triggers = new_triggers - for trigger in self.triggers: - old_trigger = trigger - trigger_params = trigger.get("parameters", {}) - # Case where param is a function (can return list or dict) - if isinstance(trigger_params, DeployTimeField): - trigger_params = deploy_time_eval(trigger_params) - # If params is a list of strings, convert to dict with same key and value - if isinstance(trigger_params, (list, tuple)): - new_trigger_params = {} - for mapping in trigger_params: - if is_stringish(mapping) or callable(mapping): - new_trigger_params[mapping] = mapping - elif callable(mapping) and not isinstance(mapping, DeployTimeField): - mapping = DeployTimeField( - "parameter_val", str, None, mapping, False - ) - new_trigger_params[mapping] = mapping - elif isinstance(mapping, (list, tuple)) and len(mapping) == 2: - if callable(mapping[0]) and not isinstance( - mapping[0], DeployTimeField - ): - mapping[0] = DeployTimeField( - "parameter_val", - str, - None, - mapping[1], - False, - ) - if callable(mapping[1]) and not isinstance( - mapping[1], DeployTimeField - ): - mapping[1] = DeployTimeField( - "parameter_val", - str, - None, - mapping[1], - False, - ) - - new_trigger_params[mapping[0]] = mapping[1] - else: - raise MetaflowException( - "The *parameters* attribute for event '%s' is invalid. " - "It should be a list/tuple of strings and lists/tuples " - "of size 2" % self.attributes["event"]["name"] - ) - trigger_params = new_trigger_params - trigger["parameters"] = trigger_params - - trigger_name = trigger.get("name") - # Case where just the name is a function (always a str) - if isinstance(trigger_name, DeployTimeField): - trigger_name = deploy_time_eval(trigger_name) - trigger["name"] = trigger_name - - # Third layer - # {name:, parameters:[func, ..., ...]} - # {name:, parameters:{func : func2}} - for trigger in self.triggers: - old_trigger = trigger - trigger_params = trigger.get("parameters", {}) - new_trigger_params = {} - for key, value in trigger_params.items(): - if isinstance(value, DeployTimeField) and key is value: - evaluated_param = deploy_time_eval(value) - new_trigger_params[evaluated_param] = evaluated_param - elif isinstance(value, DeployTimeField): - new_trigger_params[key] = deploy_time_eval(value) - elif isinstance(key, DeployTimeField): - new_trigger_params[deploy_time_eval(key)] = value - else: - new_trigger_params[key] = value - trigger["parameters"] = new_trigger_params - self.triggers[self.triggers.index(old_trigger)] = trigger class TriggerOnFinishDecorator(FlowDecorator): @@ -402,7 +401,14 @@ def flow_init( if callable(flow) and not isinstance( self.attributes["flow"], DeployTimeField ): - trig = DeployTimeField("fq_name", [str, dict], None, flow, False) + trig = DeployTimeField( + "fq_name", + [str, dict], + None, + flow, + False, + print_representation=str(flow), + ) self.triggers.append(trig) else: self.triggers.extend(self._parse_static_triggers([flow])) @@ -411,7 +417,9 @@ def flow_init( # 1. flows=['FooFlow', 'BarFlow'] flows = self.attributes["flows"] if callable(flows) and not isinstance(flows, DeployTimeField): - trig = DeployTimeField("flows", list, None, flows, False) + trig = DeployTimeField( + "flows", list, None, flows, False, print_representation=str(flows) + ) self.triggers.append(trig) elif isinstance(flows, list): self.triggers.extend(self._parse_static_triggers(flows))