Skip to content

[Do Not Merge] Dev/spin v2 #2328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 55 additions & 11 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ def config_merge_cb(ctx, param, value):
"step": "metaflow.cli_components.step_cmd.step",
"run": "metaflow.cli_components.run_cmds.run",
"resume": "metaflow.cli_components.run_cmds.resume",
"spin": "metaflow.cli_components.run_cmds.spin",
"spin-internal": "metaflow.cli_components.step_cmd.spin_internal",
},
)
def cli(ctx):
Expand Down Expand Up @@ -256,6 +258,12 @@ def version(obj):
type=click.Choice([m.TYPE for m in METADATA_PROVIDERS]),
help="Metadata service type",
)
# @click.option(
# "--spin-metadata",
# default="local",
# show_default=True,
# help="Spin Metadata service type",
# )
@click.option(
"--environment",
default=DEFAULT_ENVIRONMENT,
Expand All @@ -272,6 +280,14 @@ def version(obj):
help="Data backend type",
is_eager=True,
)
# @click.option(
# "--spin-datastore",
# default=DEFAULT_DATASTORE,
# show_default=True,
# type=click.Choice([d.TYPE for d in DATASTORES]),
# help="Data backend type",
# is_eager=True,
# )
@click.option("--datastore-root", help="Root path for datastore")
@click.option(
"--package-suffixes",
Expand Down Expand Up @@ -384,7 +400,6 @@ def start(
# second one processed will return the actual options. The order of processing
# depends on what (and in what order) the user specifies on the command line.
config_options = config_file or config_value

if (
hasattr(ctx, "saved_args")
and ctx.saved_args
Expand Down Expand Up @@ -462,14 +477,10 @@ def start(
ctx.obj.event_logger = LOGGING_SIDECARS[event_logger](
flow=ctx.obj.flow, env=ctx.obj.environment
)
ctx.obj.event_logger.start()
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)

ctx.obj.monitor = MONITOR_SIDECARS[monitor](
flow=ctx.obj.flow, env=ctx.obj.environment
)
ctx.obj.monitor.start()
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)

ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == metadata][0](
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
Expand All @@ -484,6 +495,39 @@ def start(
)

ctx.obj.config_options = config_options
ctx.obj.is_spin = False

# Add new top-level options for spin and spin-internal commands
# print(f"ctx.saved_args is {ctx.saved_args}")
if hasattr(ctx, "saved_args") and ctx.saved_args and "spin" in ctx.saved_args[0]:
# For spin, we will only use the local metadata provider, datastore, environment
# and null event logger and monitor
ctx.obj.is_spin = True
ctx.obj.spin_metadata = [m for m in METADATA_PROVIDERS if m.TYPE == "local"][0](
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
)
ctx.obj.spin_datastore_impl = [d for d in DATASTORES if d.TYPE == "local"][0]
if datastore_root is None:
datastore_root = ctx.obj.spin_datastore_impl.get_datastore_root_from_config(
ctx.obj.echo
)
ctx.obj.spin_datastore_impl.datastore_root = datastore_root
ctx.obj.spin_flow_datastore = FlowDataStore(
ctx.obj.flow.name,
ctx.obj.environment,
ctx.obj.spin_metadata, # local metadata provider
ctx.obj.event_logger,
ctx.obj.monitor,
storage_impl=ctx.obj.spin_datastore_impl,
)
# print(f"ctx.obj.spin_flow_datastore: {ctx.obj.spin_flow_datastore}")

# Start event logger and monitor
ctx.obj.event_logger.start()
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)

ctx.obj.monitor.start()
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)

decorators._init(ctx.obj.flow)

Expand All @@ -493,14 +537,14 @@ def start(
ctx.obj.flow,
ctx.obj.graph,
ctx.obj.environment,
ctx.obj.flow_datastore,
ctx.obj.metadata,
ctx.obj.flow_datastore if not ctx.obj.is_spin else ctx.obj.spin_flow_datastore,
ctx.obj.metadata if not ctx.obj.is_spin else ctx.obj.spin_metadata,
ctx.obj.logger,
echo,
deco_options,
)

# In the case of run/resume, we will want to apply the TL decospecs
# In the case of run/resume/spin, we will want to apply the TL decospecs
# *after* the run decospecs so that they don't take precedence. In other
# words, for the same decorator, we want `myflow.py run --with foo` to
# take precedence over any other `foo` decospec
Expand All @@ -516,7 +560,7 @@ def start(
parameters.set_parameter_context(
ctx.obj.flow.name,
ctx.obj.echo,
ctx.obj.flow_datastore,
ctx.obj.flow_datastore if not ctx.obj.is_spin else ctx.obj.spin_flow_datastore,
{
k: ConfigValue(v)
for k, v in ctx.obj.flow.__class__._flow_state.get(
Expand All @@ -528,9 +572,9 @@ def start(
if (
hasattr(ctx, "saved_args")
and ctx.saved_args
and ctx.saved_args[0] not in ("run", "resume")
and ctx.saved_args[0] not in ("run", "resume", "spin")
):
# run/resume are special cases because they can add more decorators with --with,
# run/resume/spin are special cases because they can add more decorators with --with,
# so they have to take care of themselves.
all_decospecs = ctx.obj.tl_decospecs + list(
ctx.obj.environment.decospecs() or []
Expand Down
142 changes: 123 additions & 19 deletions metaflow/cli_components/run_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@
from ..graph import FlowGraph
from ..metaflow_current import current
from ..package import MetaflowPackage
from ..runtime import NativeRuntime
from ..runtime import NativeRuntime, SpinRuntime
from ..system import _system_logger

from ..tagging_util import validate_tags
from ..util import get_latest_run_id, write_latest_run_id
from ..util import get_latest_run_id, write_latest_run_id, get_latest_task_pathspec


def before_run(obj, tags, decospecs):
validate_tags(tags)

# There's a --with option both at the top-level and for the run
# There's a --with option both at the top-level and for the run/resume/spin
# subcommand. Why?
#
# "run --with shoes" looks so much better than "--with shoes run".
Expand All @@ -39,7 +39,7 @@ def before_run(obj, tags, decospecs):
+ list(obj.environment.decospecs() or [])
)
if all_decospecs:
# These decospecs are the ones from run/resume PLUS the ones from the
# These decospecs are the ones from run/resume/spin PLUS the ones from the
# environment (for example the @conda)
decorators._attach_decorators(obj.flow, all_decospecs)
decorators._init(obj.flow)
Expand All @@ -51,7 +51,11 @@ def before_run(obj, tags, decospecs):
# obj.environment.init_environment(obj.logger)

decorators._init_step_decorators(
obj.flow, obj.graph, obj.environment, obj.flow_datastore, obj.logger
obj.flow,
obj.graph,
obj.environment,
obj.flow_datastore if not obj.is_spin else obj.spin_flow_datastore,
obj.logger,
)

obj.metadata.add_sticky_tags(tags=tags)
Expand All @@ -70,6 +74,28 @@ def write_file(file_path, content):
f.write(str(content))


def common_runner_options(func):
@click.option(
"--run-id-file",
default=None,
show_default=True,
type=str,
help="Write the ID of this run to the file specified.",
)
@click.option(
"--runner-attribute-file",
default=None,
show_default=True,
type=str,
help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.",
)
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)

return wrapper


def common_run_options(func):
@click.option(
"--tag",
Expand Down Expand Up @@ -110,20 +136,6 @@ def common_run_options(func):
"option multiple times to attach multiple decorators "
"in steps.",
)
@click.option(
"--run-id-file",
default=None,
show_default=True,
type=str,
help="Write the ID of this run to the file specified.",
)
@click.option(
"--runner-attribute-file",
default=None,
show_default=True,
type=str,
help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.",
)
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
Expand Down Expand Up @@ -167,6 +179,7 @@ def wrapper(*args, **kwargs):
@click.argument("step-to-rerun", required=False)
@click.command(help="Resume execution of a previous run of this flow.")
@common_run_options
@common_runner_options
@click.pass_obj
def resume(
obj,
Expand Down Expand Up @@ -285,6 +298,7 @@ def resume(
@click.command(help="Run the workflow locally.")
@tracing.cli_entrypoint("cli/run")
@common_run_options
@common_runner_options
@click.option(
"--namespace",
"user_namespace",
Expand Down Expand Up @@ -360,3 +374,93 @@ def run(
f,
)
runtime.execute()


@click.command(help="Spins up a step locally")
@click.argument(
"step-name",
required=True,
type=str,
)
@click.option(
"--task-pathspec",
default=None,
show_default=True,
help="Task ID to use when spinning up the step. The spinned up step will use the artifacts"
"corresponding to this task ID. If not provided, an arbitrary task ID from the latest run will be used.",
)
@click.option(
"--skip-decorators/--no-skip-decorators",
is_flag=True,
default=False,
show_default=True,
help="Skip decorators attached to the step.",
)
@common_runner_options
@click.pass_obj
def spin(
obj,
step_name,
task_pathspec=None,
skip_decorators=False,
run_id_file=None,
runner_attribute_file=None,
**kwargs
):
before_run(obj, [], [])
if task_pathspec is None:
task_pathspec = get_latest_task_pathspec(obj.flow.name, step_name)

obj.echo(
f"Spinning up step *{step_name}* locally with task pathspec *{task_pathspec}*"
)
obj.flow._set_constants(obj.graph, kwargs, obj.config_options)
step_func = getattr(obj.flow, step_name)

spin_runtime = SpinRuntime(
obj.flow,
obj.graph,
obj.flow_datastore,
obj.metadata,
obj.environment,
obj.package,
obj.logger,
obj.entrypoint,
obj.event_logger,
obj.monitor,
obj.spin_metadata,
obj.spin_flow_datastore,
step_func,
task_pathspec,
skip_decorators,
)
_system_logger.log_event(
level="info",
module="metaflow.task",
name="spin",
payload={
"msg": str(
{
"step_name": step_name,
"task_pathspec": task_pathspec,
}
)
},
)

write_latest_run_id(obj, spin_runtime.run_id)
write_file(run_id_file, spin_runtime.run_id)
spin_runtime.execute()

if runner_attribute_file:
with open(runner_attribute_file, "w") as f:
json.dump(
{
"task_id": spin_runtime.task.task_id,
"step_name": step_name,
"run_id": spin_runtime.run_id,
"flow_name": obj.flow.name,
"metadata": f"{obj.spin_metadata.__class__.TYPE}@{obj.spin_metadata.__class__.INFO}",
},
f,
)
Loading