Skip to content

Refactor trigger decorator and deploy time evaluation logic #2290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 129 additions & 121 deletions metaflow/plugins/events_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,26 @@ class TriggerDecorator(FlowDecorator):
"options": {},
}

def process_event_name(self, event):
def process_event(self, event):
"""
Process a single event and return a dictionary if static trigger and a function
if deploy-time trigger.

Parameters
----------
event : Union[str, Dict[str, Any], Callable]
Event to process

Returns
-------
Union[Dict[str, Union[str, Callable]], Callable]
Processed event

Raises
------
MetaflowException
If the event is not in the correct format
"""
if is_stringish(event):
return {"name": str(event)}
elif isinstance(event, dict):
Expand All @@ -82,12 +101,26 @@ def process_event_name(self, event):
event["name"], DeployTimeField
):
event["name"] = DeployTimeField(
"event_name", str, None, event["name"], False
"event_name",
str,
None,
event["name"],
False,
print_representation=str(event["name"]),
)
event["parameters"] = self.process_parameters(event.get("parameters", {}))
event["parameters"] = self.process_parameters(
event.get("parameters", {}), event["name"]
)
return event
elif callable(event) and not isinstance(event, DeployTimeField):
return DeployTimeField("event", [str, dict], None, event, False)
return DeployTimeField(
"event",
[str, dict],
None,
event,
False,
print_representation=str(event),
)
else:
raise MetaflowException(
"Incorrect format for *event* attribute in *@trigger* decorator. "
Expand All @@ -96,47 +129,69 @@ def process_event_name(self, event):
"'parameters': {'alpha': 'beta'}})"
)

def process_parameters(self, parameters):
def process_parameters(self, parameters, event_name):
"""
Process the parameters for an event and return a dictionary of parameter mappings if
parameters was statically defined or a function if deploy-time trigger.

Parameters
----------
Parameters : Union[callable, List[Union[str, Tuple[str, str]]], Dict[str, str]]
Parameters to process

event_name : Union[str, callable]
Name of the event

Returns
-------
Union[Dict[Union[str, callable], Union[str, callable]], Callable]
Processed parameters

Raises
------
MetaflowException
If the parameters are not in the correct format
"""
new_param_values = {}
if isinstance(parameters, (list, tuple)):
if isinstance(parameters, list):
new_param_list = []
for mapping in parameters:
if is_stringish(mapping):
# param_name
new_param_values[mapping] = mapping
elif callable(mapping) and not isinstance(mapping, DeployTimeField):
mapping = DeployTimeField(
"parameter_val", str, None, mapping, False
)
new_param_values[mapping] = mapping
elif isinstance(mapping, (list, tuple)) and len(mapping) == 2:
if callable(mapping[0]) and not isinstance(
mapping[0], DeployTimeField
):
mapping[0] = DeployTimeField(
"parameter_val", str, None, mapping[0], False
)
if callable(mapping[1]) and not isinstance(
mapping[1], DeployTimeField
):
mapping[1] = DeployTimeField(
"parameter_val", str, None, mapping[1], False
elif isinstance(mapping, tuple) and len(mapping) == 2:
# (param_name, field_name)
param_name, field_name = mapping
if not is_stringish(param_name) or not is_stringish(field_name):
raise MetaflowException(
f"The *parameters* attribute for event {event_name} is invalid. "
"Only strings and tuples of size 2 are allowed."
)
new_param_values[mapping[0]] = mapping[1]
new_param_values[param_name] = field_name
else:
raise MetaflowException(
"The *parameters* attribute for event is invalid. "
"It should be a list/tuple of strings and lists/tuples of size 2"
)
elif callable(parameters) and not isinstance(parameters, DeployTimeField):
return DeployTimeField(
"parameters", [list, dict, tuple], None, parameters, False
)
elif isinstance(parameters, dict):
# {Union[func, param_name]: Union[func, param_name]}
for key, value in parameters.items():
if callable(key) and not isinstance(key, DeployTimeField):
key = DeployTimeField("flow_parameter", str, None, key, False)
if callable(value) and not isinstance(value, DeployTimeField):
value = DeployTimeField("signal_parameter", str, None, value, False)
if not is_stringish(key) or not is_stringish(value):
raise MetaflowException(
f"The *parameters* attribute for event {event_name} is invalid. "
"It should be a dictionary of strings."
)
new_param_values[key] = value
elif callable(parameters) and not isinstance(parameters, DeployTimeField):
# func
return DeployTimeField(
"parameters",
[list, dict, tuple],
None,
parameters,
False,
print_representation=str(parameters),
)
return new_param_values

def flow_init(
Expand All @@ -158,7 +213,7 @@ def flow_init(
)
elif self.attributes["event"]:
event = self.attributes["event"]
processed_event = self.process_event_name(event)
processed_event = self.process_event(event)
self.triggers.append(processed_event)
elif self.attributes["events"]:
# events attribute supports the following formats -
Expand All @@ -169,13 +224,18 @@ def flow_init(
if isinstance(self.attributes["events"], list):
# process every event in events
for event in self.attributes["events"]:
processed_event = self.process_event_name(event)
processed_event = self.process_event(event)
self.triggers.append(processed_event)
elif callable(self.attributes["events"]) and not isinstance(
self.attributes["events"], DeployTimeField
):
trig = DeployTimeField(
"events", list, None, self.attributes["events"], False
"events",
list,
None,
self.attributes["events"],
False,
print_representation=str(self.attributes["events"]),
)
self.triggers.append(trig)
else:
Expand Down Expand Up @@ -208,101 +268,40 @@ def flow_init(

def format_deploytime_value(self):
new_triggers = []

# First pass to evaluate DeployTimeFields
for trigger in self.triggers:
# Case where trigger is a function that returns a list of events
# Need to do this bc we need to iterate over list later
if isinstance(trigger, DeployTimeField):
evaluated_trigger = deploy_time_eval(trigger)
if isinstance(evaluated_trigger, dict):
trigger = evaluated_trigger
elif isinstance(evaluated_trigger, str):
trigger = {"name": evaluated_trigger}
if isinstance(evaluated_trigger, list):
for trig in evaluated_trigger:
if is_stringish(trig):
new_triggers.append({"name": trig})
else: # dict or another deploytimefield
new_triggers.append(trig)
for event in evaluated_trigger:
new_triggers.append(self.process_event(event))
else:
new_triggers.append(trigger)
new_triggers.append(self.process_event(evaluated_trigger))
else:
new_triggers.append(trigger)

# Second pass to evaluate names
for trigger in new_triggers:
name = trigger.get("name")
if isinstance(name, DeployTimeField):
trigger["name"] = deploy_time_eval(name)
if not is_stringish(trigger["name"]):
raise MetaflowException(
f"The *name* attribute for event {trigger} is not a valid string"
)

# third pass to evaluate parameters
for trigger in new_triggers:
parameters = trigger.get("parameters", {})
if isinstance(parameters, DeployTimeField):
parameters_eval = deploy_time_eval(parameters)
parameters = self.process_parameters(parameters_eval, trigger["name"])
trigger["parameters"] = parameters

self.triggers = new_triggers
for trigger in self.triggers:
old_trigger = trigger
trigger_params = trigger.get("parameters", {})
# Case where param is a function (can return list or dict)
if isinstance(trigger_params, DeployTimeField):
trigger_params = deploy_time_eval(trigger_params)
# If params is a list of strings, convert to dict with same key and value
if isinstance(trigger_params, (list, tuple)):
new_trigger_params = {}
for mapping in trigger_params:
if is_stringish(mapping) or callable(mapping):
new_trigger_params[mapping] = mapping
elif callable(mapping) and not isinstance(mapping, DeployTimeField):
mapping = DeployTimeField(
"parameter_val", str, None, mapping, False
)
new_trigger_params[mapping] = mapping
elif isinstance(mapping, (list, tuple)) and len(mapping) == 2:
if callable(mapping[0]) and not isinstance(
mapping[0], DeployTimeField
):
mapping[0] = DeployTimeField(
"parameter_val",
str,
None,
mapping[1],
False,
)
if callable(mapping[1]) and not isinstance(
mapping[1], DeployTimeField
):
mapping[1] = DeployTimeField(
"parameter_val",
str,
None,
mapping[1],
False,
)

new_trigger_params[mapping[0]] = mapping[1]
else:
raise MetaflowException(
"The *parameters* attribute for event '%s' is invalid. "
"It should be a list/tuple of strings and lists/tuples "
"of size 2" % self.attributes["event"]["name"]
)
trigger_params = new_trigger_params
trigger["parameters"] = trigger_params

trigger_name = trigger.get("name")
# Case where just the name is a function (always a str)
if isinstance(trigger_name, DeployTimeField):
trigger_name = deploy_time_eval(trigger_name)
trigger["name"] = trigger_name

# Third layer
# {name:, parameters:[func, ..., ...]}
# {name:, parameters:{func : func2}}
for trigger in self.triggers:
old_trigger = trigger
trigger_params = trigger.get("parameters", {})
new_trigger_params = {}
for key, value in trigger_params.items():
if isinstance(value, DeployTimeField) and key is value:
evaluated_param = deploy_time_eval(value)
new_trigger_params[evaluated_param] = evaluated_param
elif isinstance(value, DeployTimeField):
new_trigger_params[key] = deploy_time_eval(value)
elif isinstance(key, DeployTimeField):
new_trigger_params[deploy_time_eval(key)] = value
else:
new_trigger_params[key] = value
trigger["parameters"] = new_trigger_params
self.triggers[self.triggers.index(old_trigger)] = trigger


class TriggerOnFinishDecorator(FlowDecorator):
Expand Down Expand Up @@ -402,7 +401,14 @@ def flow_init(
if callable(flow) and not isinstance(
self.attributes["flow"], DeployTimeField
):
trig = DeployTimeField("fq_name", [str, dict], None, flow, False)
trig = DeployTimeField(
"fq_name",
[str, dict],
None,
flow,
False,
print_representation=str(flow),
)
self.triggers.append(trig)
else:
self.triggers.extend(self._parse_static_triggers([flow]))
Expand All @@ -411,7 +417,9 @@ def flow_init(
# 1. flows=['FooFlow', 'BarFlow']
flows = self.attributes["flows"]
if callable(flows) and not isinstance(flows, DeployTimeField):
trig = DeployTimeField("flows", list, None, flows, False)
trig = DeployTimeField(
"flows", list, None, flows, False, print_representation=str(flows)
)
self.triggers.append(trig)
elif isinstance(flows, list):
self.triggers.extend(self._parse_static_triggers(flows))
Expand Down
Loading