Skip to content

Commit e92f139

Browse files
committed
Working commit for spin
1 parent f2d2ab2 commit e92f139

File tree

13 files changed

+1048
-169
lines changed

13 files changed

+1048
-169
lines changed

metaflow/cli.py

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ def logger(body="", system_msg=False, head="", bad=False, timestamp=True, nl=Tru
113113
"step": "metaflow.cli_components.step_cmd.step",
114114
"run": "metaflow.cli_components.run_cmds.run",
115115
"resume": "metaflow.cli_components.run_cmds.resume",
116+
"spin": "metaflow.cli_components.run_cmds.spin",
117+
"spin-internal": "metaflow.cli_components.step_cmd.spin_internal",
116118
},
117119
)
118120
def cli(ctx):
@@ -151,7 +153,7 @@ def show(obj):
151153
echo_always("\n%s" % obj.graph.doc)
152154
for node_name in obj.graph.sorted_nodes:
153155
node = obj.graph[node_name]
154-
echo_always("\nStep *%s*" % node.name, err=False)
156+
echo_always("\nStep *%s* and type: *%s*" % (node.name, node.type), err=False)
155157
echo_always(node.doc if node.doc else "?", indent=True, err=False)
156158
if node.type != "end":
157159
echo_always(
@@ -349,7 +351,6 @@ def start(
349351
)
350352

351353
ctx.obj.datastore_impl.datastore_root = datastore_root
352-
353354
FlowDataStore.default_storage_impl = ctx.obj.datastore_impl
354355

355356
# At this point, we are able to resolve the user-configuration options so we can
@@ -439,14 +440,10 @@ def start(
439440
ctx.obj.event_logger = LOGGING_SIDECARS[event_logger](
440441
flow=ctx.obj.flow, env=ctx.obj.environment
441442
)
442-
ctx.obj.event_logger.start()
443-
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)
444443

445444
ctx.obj.monitor = MONITOR_SIDECARS[monitor](
446445
flow=ctx.obj.flow, env=ctx.obj.environment
447446
)
448-
ctx.obj.monitor.start()
449-
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)
450447

451448
ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == metadata][0](
452449
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
@@ -461,6 +458,44 @@ def start(
461458
)
462459

463460
ctx.obj.config_options = config_options
461+
ctx.obj.is_spin = False
462+
463+
# Override values for spin
464+
if hasattr(ctx, "saved_args") and ctx.saved_args and "spin" in ctx.saved_args[0]:
465+
# For spin, we will only use the local metadata provider, datastore, environment
466+
# and null event logger and monitor
467+
ctx.obj.is_spin = True
468+
ctx.obj.spin_metadata = [m for m in METADATA_PROVIDERS if m.TYPE == "local"][0](
469+
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
470+
)
471+
# ctx.obj.event_logger = LOGGING_SIDECARS["nullSidecarLogger"](
472+
# flow=ctx.obj.flow, env=ctx.obj.environment
473+
# )
474+
# ctx.obj.monitor = MONITOR_SIDECARS["nullSidecarMonitor"](
475+
# flow=ctx.obj.flow, env=ctx.obj.environment
476+
# )
477+
# ctx.obj.spin_datastore_impl = [d for d in DATASTORES if d.TYPE == "local"][0]
478+
ctx.obj.spin_datastore_impl = [d for d in DATASTORES if d.TYPE == "s3"][0]
479+
if datastore_root is None:
480+
datastore_root = ctx.obj.spin_datastore_impl.get_datastore_root_from_config(
481+
ctx.obj.echo
482+
)
483+
ctx.obj.spin_datastore_impl.datastore_root = datastore_root
484+
ctx.obj.spin_flow_datastore = FlowDataStore(
485+
ctx.obj.flow.name,
486+
ctx.obj.environment, # Same environment as run/resume
487+
ctx.obj.spin_metadata, # local metadata provider
488+
ctx.obj.event_logger, # null event logger
489+
ctx.obj.monitor, # null monitor
490+
storage_impl=ctx.obj.spin_datastore_impl,
491+
)
492+
493+
# Start event logger and monitor
494+
ctx.obj.event_logger.start()
495+
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)
496+
497+
ctx.obj.monitor.start()
498+
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)
464499

465500
decorators._init(ctx.obj.flow)
466501

@@ -470,14 +505,14 @@ def start(
470505
ctx.obj.flow,
471506
ctx.obj.graph,
472507
ctx.obj.environment,
473-
ctx.obj.flow_datastore,
474-
ctx.obj.metadata,
508+
ctx.obj.flow_datastore if not ctx.obj.is_spin else ctx.obj.spin_flow_datastore,
509+
ctx.obj.metadata if not ctx.obj.is_spin else ctx.obj.spin_metadata,
475510
ctx.obj.logger,
476511
echo,
477512
deco_options,
478513
)
479514

480-
# In the case of run/resume, we will want to apply the TL decospecs
515+
# In the case of run/resume/spin, we will want to apply the TL decospecs
481516
# *after* the run decospecs so that they don't take precedence. In other
482517
# words, for the same decorator, we want `myflow.py run --with foo` to
483518
# take precedence over any other `foo` decospec
@@ -493,7 +528,7 @@ def start(
493528
parameters.set_parameter_context(
494529
ctx.obj.flow.name,
495530
ctx.obj.echo,
496-
ctx.obj.flow_datastore,
531+
ctx.obj.flow_datastore if not ctx.obj.is_spin else ctx.obj.spin_flow_datastore,
497532
{
498533
k: ConfigValue(v)
499534
for k, v in ctx.obj.flow.__class__._flow_state.get(
@@ -505,9 +540,9 @@ def start(
505540
if (
506541
hasattr(ctx, "saved_args")
507542
and ctx.saved_args
508-
and ctx.saved_args[0] not in ("run", "resume")
543+
and ctx.saved_args[0] not in ("run", "resume", "spin")
509544
):
510-
# run/resume are special cases because they can add more decorators with --with,
545+
# run/resume/spin are special cases because they can add more decorators with --with,
511546
# so they have to take care of themselves.
512547
all_decospecs = ctx.obj.tl_decospecs + list(
513548
ctx.obj.environment.decospecs() or []

metaflow/cli_components/run_cmds.py

Lines changed: 135 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,17 @@
1010
from ..metaflow_current import current
1111
from ..metaflow_config import DEFAULT_DECOSPECS
1212
from ..package import MetaflowPackage
13-
from ..runtime import NativeRuntime
13+
from ..runtime import NativeRuntime, SpinRuntime
1414
from ..system import _system_logger
1515

1616
from ..tagging_util import validate_tags
17-
from ..util import get_latest_run_id, write_latest_run_id
17+
from ..util import get_latest_run_id, write_latest_run_id, get_latest_task_pathspec
1818

1919

2020
def before_run(obj, tags, decospecs):
2121
validate_tags(tags)
2222

23-
# There's a --with option both at the top-level and for the run
23+
# There's a --with option both at the top-level and for the run/resume/spin
2424
# subcommand. Why?
2525
#
2626
# "run --with shoes" looks so much better than "--with shoes run".
@@ -40,7 +40,7 @@ def before_run(obj, tags, decospecs):
4040
+ list(obj.environment.decospecs() or [])
4141
)
4242
if all_decospecs:
43-
# These decospecs are the ones from run/resume PLUS the ones from the
43+
# These decospecs are the ones from run/resume/spin PLUS the ones from the
4444
# environment (for example the @conda)
4545
decorators._attach_decorators(obj.flow, all_decospecs)
4646
decorators._init(obj.flow)
@@ -52,7 +52,11 @@ def before_run(obj, tags, decospecs):
5252
# obj.environment.init_environment(obj.logger)
5353

5454
decorators._init_step_decorators(
55-
obj.flow, obj.graph, obj.environment, obj.flow_datastore, obj.logger
55+
obj.flow,
56+
obj.graph,
57+
obj.environment,
58+
obj.flow_datastore if not obj.is_spin else obj.spin_flow_datastore,
59+
obj.logger,
5660
)
5761

5862
obj.metadata.add_sticky_tags(tags=tags)
@@ -88,6 +92,29 @@ def config_merge_cb(ctx, param, value):
8892
return tuple(list(value) + splits)
8993

9094

95+
def common_runner_options(func):
96+
@click.option(
97+
"--run-id-file",
98+
default=None,
99+
show_default=True,
100+
type=str,
101+
help="Write the ID of this run to the file specified.",
102+
)
103+
@click.option(
104+
"--runner-attribute-file",
105+
default=None,
106+
show_default=True,
107+
type=str,
108+
help="Write the metadata and pathspec of this run to the file specified. Used internally "
109+
"for Metaflow's Runner API.",
110+
)
111+
@wraps(func)
112+
def wrapper(*args, **kwargs):
113+
return func(*args, **kwargs)
114+
115+
return wrapper
116+
117+
91118
def common_run_options(func):
92119
@click.option(
93120
"--tag",
@@ -129,20 +156,6 @@ def common_run_options(func):
129156
"in steps.",
130157
callback=config_merge_cb,
131158
)
132-
@click.option(
133-
"--run-id-file",
134-
default=None,
135-
show_default=True,
136-
type=str,
137-
help="Write the ID of this run to the file specified.",
138-
)
139-
@click.option(
140-
"--runner-attribute-file",
141-
default=None,
142-
show_default=True,
143-
type=str,
144-
help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.",
145-
)
146159
@wraps(func)
147160
def wrapper(*args, **kwargs):
148161
return func(*args, **kwargs)
@@ -187,6 +200,7 @@ def wrapper(*args, **kwargs):
187200
@click.command(help="Resume execution of a previous run of this flow.")
188201
@tracing.cli("cli/resume")
189202
@common_run_options
203+
@common_runner_options
190204
@click.pass_obj
191205
def resume(
192206
obj,
@@ -305,6 +319,7 @@ def resume(
305319
@click.command(help="Run the workflow locally.")
306320
@tracing.cli("cli/run")
307321
@common_run_options
322+
@common_runner_options
308323
@click.option(
309324
"--namespace",
310325
"user_namespace",
@@ -380,3 +395,104 @@ def run(
380395
)
381396
with runtime.run_heartbeat():
382397
runtime.execute()
398+
399+
400+
@click.command(help="Spins up a task for a given step from a previous run locally.")
401+
@click.argument(
402+
"step-name",
403+
required=True,
404+
type=str,
405+
)
406+
@click.option(
407+
"--spin-pathspec",
408+
default=None,
409+
show_default=True,
410+
help="Task ID to use when spinning up the step. The spun step will use the artifacts"
411+
"corresponding to this task ID. If not provided, an arbitrary task ID from the latest "
412+
"run will be used.",
413+
)
414+
@click.option(
415+
"--skip-decorators/--no-skip-decorators",
416+
is_flag=True,
417+
default=False,
418+
show_default=True,
419+
help="Skip decorators attached to the step.",
420+
)
421+
@click.option(
422+
"--max-log-size",
423+
default=10,
424+
show_default=True,
425+
help="Maximum size of stdout and stderr captured in "
426+
"megabytes. If a step outputs more than this to "
427+
"stdout/stderr, its output will be truncated.",
428+
)
429+
@common_runner_options
430+
@click.pass_obj
431+
def spin(
432+
obj,
433+
step_name,
434+
spin_pathspec=None,
435+
skip_decorators=False,
436+
max_log_size=None,
437+
run_id_file=None,
438+
runner_attribute_file=None,
439+
**kwargs
440+
):
441+
before_run(obj, [], [])
442+
if spin_pathspec is None:
443+
spin_pathspec = get_latest_task_pathspec(obj.flow.name, step_name)
444+
445+
obj.echo(
446+
f"Spinning up step *{step_name}* locally with task pathspec *{spin_pathspec}*"
447+
)
448+
obj.flow._set_constants(obj.graph, kwargs, obj.config_options)
449+
step_func = getattr(obj.flow, step_name)
450+
451+
spin_runtime = SpinRuntime(
452+
obj.flow,
453+
obj.graph,
454+
obj.flow_datastore,
455+
obj.metadata,
456+
obj.environment,
457+
obj.package,
458+
obj.logger,
459+
obj.entrypoint,
460+
obj.event_logger,
461+
obj.monitor,
462+
obj.spin_metadata,
463+
obj.spin_flow_datastore,
464+
step_func,
465+
spin_pathspec,
466+
skip_decorators,
467+
max_log_size * 1024 * 1024,
468+
)
469+
_system_logger.log_event(
470+
level="info",
471+
module="metaflow.task",
472+
name="spin",
473+
payload={
474+
"msg": str(
475+
{
476+
"step_name": step_name,
477+
"task_pathspec": spin_pathspec,
478+
}
479+
)
480+
},
481+
)
482+
483+
write_latest_run_id(obj, spin_runtime.run_id)
484+
write_file(run_id_file, spin_runtime.run_id)
485+
spin_runtime.execute()
486+
487+
if runner_attribute_file:
488+
with open(runner_attribute_file, "w") as f:
489+
json.dump(
490+
{
491+
"task_id": spin_runtime.task.task_id,
492+
"step_name": step_name,
493+
"run_id": spin_runtime.run_id,
494+
"flow_name": obj.flow.name,
495+
"metadata": f"{obj.spin_metadata.__class__.TYPE}@{obj.spin_metadata.__class__.INFO}",
496+
},
497+
f,
498+
)

0 commit comments

Comments
 (0)