From ccdd76b911776c66ef9ccecd1a634e9d3fdd59a8 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Tue, 11 Feb 2025 19:21:34 +0100 Subject: [PATCH 01/34] add custom log --- metaflow/cli_components/step_cmd.py | 1 + 1 file changed, 1 insertion(+) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index 4b40c9e5e54..357253077d5 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -162,6 +162,7 @@ def step( retry_count, ) else: + echo(f"Custom log in step.....{step_name}") task.run_step( step_name, run_id, From 0427c73db64321bd4ba29d025770844e7dc766d0 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Tue, 11 Feb 2025 21:39:17 +0100 Subject: [PATCH 02/34] use echo_always --- metaflow/cli_components/step_cmd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index 357253077d5..eb39f97072f 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -162,7 +162,7 @@ def step( retry_count, ) else: - echo(f"Custom log in step.....{step_name}") + echo_always(f"Custom log in step.....{step_name}") task.run_step( step_name, run_id, From 9ab0b1cb2b668bef658f96d2ea7bb60fef9e0046 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Wed, 12 Feb 2025 09:13:11 +0100 Subject: [PATCH 03/34] debug log to spot attempt number --- metaflow/cli_components/step_cmd.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index eb39f97072f..bead1dbf665 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -162,7 +162,8 @@ def step( retry_count, ) else: - echo_always(f"Custom log in step.....{step_name}") + t_datastore = task.flow_datastore.get_task_datastore() + echo_always(f"attempt: {t_datastore.attempt}") task.run_step( step_name, run_id, From 8617d4485fba934fbfff488e385520e314d2ad30 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Wed, 12 Feb 2025 09:31:52 +0100 Subject: [PATCH 04/34] pass kwargs get_task_datastore() --- metaflow/cli_components/step_cmd.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index bead1dbf665..0db06e289c6 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -162,8 +162,15 @@ def step( retry_count, ) else: - t_datastore = task.flow_datastore.get_task_datastore() + echo_always(f" run_id: {run_id}, step_name: {step_name}, task_id: {task_id}") + t_datastore = task.flow_datastore.get_task_datastore( + run_id=run_id, + step_name=step_name, + task_id=task_id + ) echo_always(f"attempt: {t_datastore.attempt}") + retry_count = t_datastore.attempt if t_datastore.attempt else 0 + echo_always(f"retry count: {retry_count}") task.run_step( step_name, run_id, From cfc0624090074f6fbf53ba5f68d37ddbab459fe7 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Wed, 12 Feb 2025 09:47:04 +0100 Subject: [PATCH 05/34] use try-excpet to mitigate datastore error --- metaflow/cli_components/step_cmd.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index 0db06e289c6..b0e7429633d 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -162,14 +162,18 @@ def step( retry_count, ) else: - echo_always(f" run_id: {run_id}, step_name: {step_name}, task_id: {task_id}") - t_datastore = task.flow_datastore.get_task_datastore( - run_id=run_id, - step_name=step_name, - task_id=task_id - ) - echo_always(f"attempt: {t_datastore.attempt}") - retry_count = t_datastore.attempt if t_datastore.attempt else 0 + from metaflow.datastore.exceptions import DataException + + echo_always(f" run_id: {run_id}, step_name: {step_name}, task_id: {task_id}") + try: + t_datastore = task.flow_datastore.get_task_datastore( + run_id=run_id, + step_name=step_name, + task_id=task_id + ) + retry_count = t_datastore.attempt + except DataException: + retry_count = 0 echo_always(f"retry count: {retry_count}") task.run_step( step_name, From 47b8dd373a3ecf72658e05d607c7a447e6976646 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Wed, 12 Feb 2025 10:14:25 +0100 Subject: [PATCH 06/34] enable allow_not_done --- metaflow/cli_components/step_cmd.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index b0e7429633d..3cf22f53e7f 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -169,7 +169,8 @@ def step( t_datastore = task.flow_datastore.get_task_datastore( run_id=run_id, step_name=step_name, - task_id=task_id + task_id=task_id, + allow_not_done=True, ) retry_count = t_datastore.attempt except DataException: From f2126ec58cdde0c740da7d667dce290c88c33b0c Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Wed, 12 Feb 2025 11:05:22 +0100 Subject: [PATCH 07/34] remove try/except block --- metaflow/cli_components/step_cmd.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index 3cf22f53e7f..deb9f6044b9 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -164,17 +164,14 @@ def step( else: from metaflow.datastore.exceptions import DataException - echo_always(f" run_id: {run_id}, step_name: {step_name}, task_id: {task_id}") - try: - t_datastore = task.flow_datastore.get_task_datastore( - run_id=run_id, - step_name=step_name, - task_id=task_id, - allow_not_done=True, - ) - retry_count = t_datastore.attempt - except DataException: - retry_count = 0 + echo_always(f" run_id: {run_id}, step_name: {step_name}, task_id: {task_id}") + t_datastore = task.flow_datastore.get_task_datastore( + run_id=run_id, + step_name=step_name, + task_id=task_id, + allow_not_done=True, + ) + retry_count = t_datastore.attempt echo_always(f"retry count: {retry_count}") task.run_step( step_name, From 966ac664629071aa29f32a8e332d5c6fe7fad51d Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Wed, 12 Feb 2025 12:13:26 +0100 Subject: [PATCH 08/34] print datastore metadata --- metaflow/cli_components/step_cmd.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index deb9f6044b9..a0f808ecd1e 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -162,8 +162,6 @@ def step( retry_count, ) else: - from metaflow.datastore.exceptions import DataException - echo_always(f" run_id: {run_id}, step_name: {step_name}, task_id: {task_id}") t_datastore = task.flow_datastore.get_task_datastore( run_id=run_id, @@ -171,6 +169,7 @@ def step( task_id=task_id, allow_not_done=True, ) + echo_always(f"data store metadata: {t_datastore.ds_metadata}") retry_count = t_datastore.attempt echo_always(f"retry count: {retry_count}") task.run_step( From 3576162e90f6e9969efce8ccf031e0329bb04892 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Wed, 12 Feb 2025 12:32:15 +0100 Subject: [PATCH 09/34] use flow_datastore intead --- metaflow/cli_components/step_cmd.py | 1 + 1 file changed, 1 insertion(+) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index a0f808ecd1e..5fb34b78771 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -163,6 +163,7 @@ def step( ) else: echo_always(f" run_id: {run_id}, step_name: {step_name}, task_id: {task_id}") + echo_always(f"attempt: {task.flow_datastore.attempt}, ds_root: {task.flow_datastore.datastore_root}, ca_store: {task.flow_datastore.ca_store}") t_datastore = task.flow_datastore.get_task_datastore( run_id=run_id, step_name=step_name, From 326d847766250ca260c315944ef4a983309b5a31 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Wed, 12 Feb 2025 14:23:58 +0100 Subject: [PATCH 10/34] fix attrs --- metaflow/cli_components/step_cmd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index 5fb34b78771..5e31f8961cb 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -163,7 +163,7 @@ def step( ) else: echo_always(f" run_id: {run_id}, step_name: {step_name}, task_id: {task_id}") - echo_always(f"attempt: {task.flow_datastore.attempt}, ds_root: {task.flow_datastore.datastore_root}, ca_store: {task.flow_datastore.ca_store}") + echo_always(f"ca_store: {task.flow_datastore.ca_store.TYPE}, {task.flow_datastore.datastore_root}") t_datastore = task.flow_datastore.get_task_datastore( run_id=run_id, step_name=step_name, From f4d0acd290eab4bc04645d11884d3c8f3964ae2e Mon Sep 17 00:00:00 2001 From: Jaya Ram Kollipara <105320563+dhpikolo@users.noreply.github.com> Date: Wed, 12 Feb 2025 15:18:18 +0100 Subject: [PATCH 11/34] Update metaflow/cli_components/step_cmd.py --- metaflow/cli_components/step_cmd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index 5e31f8961cb..717ecf49602 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -168,7 +168,7 @@ def step( run_id=run_id, step_name=step_name, task_id=task_id, - allow_not_done=True, + allow_not_done=True, # hack to not run into DataException ) echo_always(f"data store metadata: {t_datastore.ds_metadata}") retry_count = t_datastore.attempt From 8f5044fb06aa199e944c514f0c2cba02b1ff2656 Mon Sep 17 00:00:00 2001 From: Jaya Ram Kollipara <105320563+dhpikolo@users.noreply.github.com> Date: Wed, 12 Feb 2025 15:20:34 +0100 Subject: [PATCH 12/34] Update metaflow/cli_components/step_cmd.py --- metaflow/cli_components/step_cmd.py | 1 - 1 file changed, 1 deletion(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index 717ecf49602..053ca52deae 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -170,7 +170,6 @@ def step( task_id=task_id, allow_not_done=True, # hack to not run into DataException ) - echo_always(f"data store metadata: {t_datastore.ds_metadata}") retry_count = t_datastore.attempt echo_always(f"retry count: {retry_count}") task.run_step( From f1aa3a65b7bd85c4db83fb89cfd638c035087d3a Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Thu, 13 Feb 2025 14:37:14 +0100 Subject: [PATCH 13/34] add try except block --- metaflow/cli_components/step_cmd.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index 053ca52deae..83e2f83037e 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -162,15 +162,21 @@ def step( retry_count, ) else: + from metaflow.datastore.exceptions import DataException + echo_always(f" run_id: {run_id}, step_name: {step_name}, task_id: {task_id}") echo_always(f"ca_store: {task.flow_datastore.ca_store.TYPE}, {task.flow_datastore.datastore_root}") - t_datastore = task.flow_datastore.get_task_datastore( - run_id=run_id, - step_name=step_name, - task_id=task_id, - allow_not_done=True, # hack to not run into DataException - ) - retry_count = t_datastore.attempt + echo_always(task.flow_datastore.ca_store) + try: + # this will hit exception on first run + t_datastore = task.flow_datastore.get_task_datastore( + run_id=run_id, + step_name=step_name, + task_id=task_id, + ) + retry_count += getattr(t_datastore, "attempt", 0) + except DataException as e: + echo_always(f"Warning: Failed to retrieve datastore. Exception: {e}") echo_always(f"retry count: {retry_count}") task.run_step( step_name, From cc2a622efb7fca30a128d3c93f0a39c9d8ab029a Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Thu, 13 Feb 2025 14:40:13 +0100 Subject: [PATCH 14/34] correct math --- metaflow/cli_components/step_cmd.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index 83e2f83037e..aef42231144 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -174,8 +174,9 @@ def step( step_name=step_name, task_id=task_id, ) - retry_count += getattr(t_datastore, "attempt", 0) + retry_count = getattr(t_datastore, "attempt", 0) + 1 except DataException as e: + retry_count = 0 echo_always(f"Warning: Failed to retrieve datastore. Exception: {e}") echo_always(f"retry count: {retry_count}") task.run_step( From bba65048bbc691bfe146000581c7b034cdc190f7 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Thu, 13 Feb 2025 14:48:58 +0100 Subject: [PATCH 15/34] do not use ca_store attrs --- metaflow/cli_components/step_cmd.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index aef42231144..1d2a2b52af7 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -165,8 +165,6 @@ def step( from metaflow.datastore.exceptions import DataException echo_always(f" run_id: {run_id}, step_name: {step_name}, task_id: {task_id}") - echo_always(f"ca_store: {task.flow_datastore.ca_store.TYPE}, {task.flow_datastore.datastore_root}") - echo_always(task.flow_datastore.ca_store) try: # this will hit exception on first run t_datastore = task.flow_datastore.get_task_datastore( From 5603de083b5ea7a6f4f18fe64930d4ffd0381de4 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Thu, 13 Feb 2025 15:17:29 +0100 Subject: [PATCH 16/34] improve logs --- metaflow/cli_components/step_cmd.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index 1d2a2b52af7..186caedb58b 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -172,10 +172,11 @@ def step( step_name=step_name, task_id=task_id, ) - retry_count = getattr(t_datastore, "attempt", 0) + 1 + echo_always(f"latest attempt number: {t_datastore.attempt}") + retry_count = t_datastore.attempt + 1 except DataException as e: retry_count = 0 - echo_always(f"Warning: Failed to retrieve datastore. Exception: {e}") + echo_always(f"This is a first ever run, Exception: {e}") echo_always(f"retry count: {retry_count}") task.run_step( step_name, From e60595816e03500e3161d96fb3ca58050fbf4dd5 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Thu, 13 Feb 2025 15:40:08 +0100 Subject: [PATCH 17/34] use getattr --- metaflow/cli_components/step_cmd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index 186caedb58b..69226735d22 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -173,7 +173,7 @@ def step( task_id=task_id, ) echo_always(f"latest attempt number: {t_datastore.attempt}") - retry_count = t_datastore.attempt + 1 + retry_count = getattr(t_datastore, "attempt", 0) + 1 except DataException as e: retry_count = 0 echo_always(f"This is a first ever run, Exception: {e}") From f2829d24fe4b285a69cac6aea9cb1c10ce70143f Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Thu, 13 Feb 2025 17:03:36 +0100 Subject: [PATCH 18/34] set allow_not_done = True --- metaflow/cli_components/step_cmd.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index 69226735d22..cd8961764b0 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -171,6 +171,7 @@ def step( run_id=run_id, step_name=step_name, task_id=task_id, + allow_not_done=True ) echo_always(f"latest attempt number: {t_datastore.attempt}") retry_count = getattr(t_datastore, "attempt", 0) + 1 @@ -178,6 +179,9 @@ def step( retry_count = 0 echo_always(f"This is a first ever run, Exception: {e}") echo_always(f"retry count: {retry_count}") + # Not sure what are the side effects to this. + # if retry_count >= max_user_code_retries: + # max_user_code_retries = retry_count task.run_step( step_name, run_id, From 5f7d50d36d0b1a4d70c17aca30ca0e95fcac5d3d Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Fri, 14 Feb 2025 12:24:05 +0100 Subject: [PATCH 19/34] use taskdatastores instead --- metaflow/cli_components/step_cmd.py | 28 +++++++++------------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index cd8961764b0..677fdd120c0 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -162,26 +162,16 @@ def step( retry_count, ) else: - from metaflow.datastore.exceptions import DataException - - echo_always(f" run_id: {run_id}, step_name: {step_name}, task_id: {task_id}") - try: - # this will hit exception on first run - t_datastore = task.flow_datastore.get_task_datastore( - run_id=run_id, - step_name=step_name, - task_id=task_id, - allow_not_done=True - ) - echo_always(f"latest attempt number: {t_datastore.attempt}") - retry_count = getattr(t_datastore, "attempt", 0) + 1 - except DataException as e: - retry_count = 0 - echo_always(f"This is a first ever run, Exception: {e}") - echo_always(f"retry count: {retry_count}") + t_datastores = task.flow_datastore.get_task_datastores( + pathspecs=[f"{run_id}/{step_name}/{task_id}"], + include_prior=True, + ) + latest_done_attempt = max([t.attempt for t in t_datastores], default=-1) # default=-1, this is first run. + retry_count = latest_done_attempt + 1 + echo_always(f"{retry_count=}") # Not sure what are the side effects to this. - # if retry_count >= max_user_code_retries: - # max_user_code_retries = retry_count + if retry_count >= max_user_code_retries: + max_user_code_retries = retry_count task.run_step( step_name, run_id, From 8fc9b37a991b840fac70d2c7376274b694784e2e Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Fri, 14 Feb 2025 12:26:28 +0100 Subject: [PATCH 20/34] add logs --- metaflow/cli_components/step_cmd.py | 1 + 1 file changed, 1 insertion(+) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index 677fdd120c0..c58450ffd80 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -168,6 +168,7 @@ def step( ) latest_done_attempt = max([t.attempt for t in t_datastores], default=-1) # default=-1, this is first run. retry_count = latest_done_attempt + 1 + echo_always(f"{latest_done_attempt=}") echo_always(f"{retry_count=}") # Not sure what are the side effects to this. if retry_count >= max_user_code_retries: From ca8736f10eb6377af85251b3fa4fc523e61bcc1d Mon Sep 17 00:00:00 2001 From: Jaya Ram Kollipara <105320563+dhpikolo@users.noreply.github.com> Date: Fri, 14 Feb 2025 14:16:55 +0100 Subject: [PATCH 21/34] Update metaflow/cli_components/step_cmd.py --- metaflow/cli_components/step_cmd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index c58450ffd80..aa223b14bda 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -166,7 +166,7 @@ def step( pathspecs=[f"{run_id}/{step_name}/{task_id}"], include_prior=True, ) - latest_done_attempt = max([t.attempt for t in t_datastores], default=-1) # default=-1, this is first run. + latest_done_attempt = max([t.attempt for t in t_datastores], default=-1) # default=-1, when no successful done_attempts found. retry_count = latest_done_attempt + 1 echo_always(f"{latest_done_attempt=}") echo_always(f"{retry_count=}") From 98b75792d103ad8438bcb02c127fbfbf5a7f0d37 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Fri, 14 Feb 2025 14:31:28 +0100 Subject: [PATCH 22/34] save task std logs --- metaflow/mflog/save_logs.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/metaflow/mflog/save_logs.py b/metaflow/mflog/save_logs.py index 0ff6706e6f9..158df76ab68 100644 --- a/metaflow/mflog/save_logs.py +++ b/metaflow/mflog/save_logs.py @@ -21,7 +21,8 @@ def _read_file(path): # these env vars are set by mflog.mflog_env pathspec = os.environ["MF_PATHSPEC"] - attempt = os.environ["MF_ATTEMPT"] + # Not using this anymore, since we infer attempt number from flowdatastore + # attempt = os.environ["MF_ATTEMPT"] ds_type = os.environ["MF_DATASTORE"] ds_root = os.environ.get("MF_DATASTORE_ROOT") paths = (os.environ["MFLOG_STDOUT"], os.environ["MFLOG_STDERR"]) @@ -37,8 +38,14 @@ def print_clean(line, **kwargs): flow_datastore = FlowDataStore( flow_name, None, storage_impl=storage_impl, ds_root=ds_root ) + # Use inferred attempt - to save task_stdout.log and task_stderr.log + t_datastores = flow_datastore.get_task_datastores( + pathspecs=[pathspec], + include_prior=True, + ) + latest_attempt = max([ds.attempt for ds in t_datastores], default=0) task_datastore = flow_datastore.get_task_datastore( - run_id, step_name, task_id, int(attempt), mode="w" + run_id, step_name, task_id, int(latest_attempt), mode="w" ) try: From 7f8959856362524ac6e6ee6dfe7fa7ccc833d4a6 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Fri, 14 Feb 2025 16:14:02 +0100 Subject: [PATCH 23/34] Revert "save task std logs" This reverts commit 98b75792d103ad8438bcb02c127fbfbf5a7f0d37. --- metaflow/mflog/save_logs.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/metaflow/mflog/save_logs.py b/metaflow/mflog/save_logs.py index 158df76ab68..0ff6706e6f9 100644 --- a/metaflow/mflog/save_logs.py +++ b/metaflow/mflog/save_logs.py @@ -21,8 +21,7 @@ def _read_file(path): # these env vars are set by mflog.mflog_env pathspec = os.environ["MF_PATHSPEC"] - # Not using this anymore, since we infer attempt number from flowdatastore - # attempt = os.environ["MF_ATTEMPT"] + attempt = os.environ["MF_ATTEMPT"] ds_type = os.environ["MF_DATASTORE"] ds_root = os.environ.get("MF_DATASTORE_ROOT") paths = (os.environ["MFLOG_STDOUT"], os.environ["MFLOG_STDERR"]) @@ -38,14 +37,8 @@ def print_clean(line, **kwargs): flow_datastore = FlowDataStore( flow_name, None, storage_impl=storage_impl, ds_root=ds_root ) - # Use inferred attempt - to save task_stdout.log and task_stderr.log - t_datastores = flow_datastore.get_task_datastores( - pathspecs=[pathspec], - include_prior=True, - ) - latest_attempt = max([ds.attempt for ds in t_datastores], default=0) task_datastore = flow_datastore.get_task_datastore( - run_id, step_name, task_id, int(latest_attempt), mode="w" + run_id, step_name, task_id, int(attempt), mode="w" ) try: From 44e05cc6186c5def0d3f516711da53990b0a3b5c Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Fri, 14 Feb 2025 17:26:48 +0100 Subject: [PATCH 24/34] refactor + infer attempt in mflog.save_logs --- metaflow/cli_components/step_cmd.py | 8 ++++---- metaflow/datastore/flow_datastore.py | 7 +++++++ metaflow/mflog/save_logs.py | 5 +++-- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index aa223b14bda..85806fd1195 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -162,11 +162,11 @@ def step( retry_count, ) else: - t_datastores = task.flow_datastore.get_task_datastores( - pathspecs=[f"{run_id}/{step_name}/{task_id}"], - include_prior=True, + latest_done_attempt = task.flow_datastore.get_latest_done_attempt( + run_id=run_id, + step_name=step_name, + task_id=task_id ) - latest_done_attempt = max([t.attempt for t in t_datastores], default=-1) # default=-1, when no successful done_attempts found. retry_count = latest_done_attempt + 1 echo_always(f"{latest_done_attempt=}") echo_always(f"{retry_count=}") diff --git a/metaflow/datastore/flow_datastore.py b/metaflow/datastore/flow_datastore.py index 16318ed7693..85d0635c61c 100644 --- a/metaflow/datastore/flow_datastore.py +++ b/metaflow/datastore/flow_datastore.py @@ -67,6 +67,13 @@ def __init__( def datastore_root(self): return self._storage_impl.datastore_root + def get_latest_done_attempt(self, run_id, step_name, task_id) -> int: + t_datastores = self.get_task_datastores( + pathspecs=[f"{run_id}/{step_name}/{task_id}"], + include_prior=True + ) + return max([t.attempt for t in t_datastores], default=-1) # default=-1, when no successful done_attempts found. + def get_task_datastores( self, run_id=None, diff --git a/metaflow/mflog/save_logs.py b/metaflow/mflog/save_logs.py index 0ff6706e6f9..86819a7b8ac 100644 --- a/metaflow/mflog/save_logs.py +++ b/metaflow/mflog/save_logs.py @@ -21,7 +21,6 @@ def _read_file(path): # these env vars are set by mflog.mflog_env pathspec = os.environ["MF_PATHSPEC"] - attempt = os.environ["MF_ATTEMPT"] ds_type = os.environ["MF_DATASTORE"] ds_root = os.environ.get("MF_DATASTORE_ROOT") paths = (os.environ["MFLOG_STDOUT"], os.environ["MFLOG_STDERR"]) @@ -37,8 +36,10 @@ def print_clean(line, **kwargs): flow_datastore = FlowDataStore( flow_name, None, storage_impl=storage_impl, ds_root=ds_root ) + # Use inferred attempt - to save task_stdout.log and task_stderr.log + latest_done_attempt = flow_datastore.get_latest_done_attempt(run_id=run_id, step_name=step_name, task_id=task_id) task_datastore = flow_datastore.get_task_datastore( - run_id, step_name, task_id, int(attempt), mode="w" + run_id, step_name, task_id, int(latest_done_attempt), mode="w" ) try: From b5bb0505ff4a79e6fca2153649a289cecd022983 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Mon, 17 Feb 2025 12:28:51 +0100 Subject: [PATCH 25/34] change default to 0 --- metaflow/cli_components/step_cmd.py | 4 +++- metaflow/datastore/flow_datastore.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index 85806fd1195..5b6780a525c 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -167,7 +167,9 @@ def step( step_name=step_name, task_id=task_id ) - retry_count = latest_done_attempt + 1 + retry_count = 0 + if not latest_done_attempt == 0: + retry_count += latest_done_attempt echo_always(f"{latest_done_attempt=}") echo_always(f"{retry_count=}") # Not sure what are the side effects to this. diff --git a/metaflow/datastore/flow_datastore.py b/metaflow/datastore/flow_datastore.py index 85d0635c61c..053a047d000 100644 --- a/metaflow/datastore/flow_datastore.py +++ b/metaflow/datastore/flow_datastore.py @@ -72,7 +72,7 @@ def get_latest_done_attempt(self, run_id, step_name, task_id) -> int: pathspecs=[f"{run_id}/{step_name}/{task_id}"], include_prior=True ) - return max([t.attempt for t in t_datastores], default=-1) # default=-1, when no successful done_attempts found. + return max([t.attempt for t in t_datastores], default=0) # returns default, if this was a first attempt. def get_task_datastores( self, From b69f81d403c3bea6d8ed9c6bf8eb43b87c81c3c8 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Mon, 17 Feb 2025 14:31:16 +0100 Subject: [PATCH 26/34] remove casting to int --- metaflow/mflog/save_logs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metaflow/mflog/save_logs.py b/metaflow/mflog/save_logs.py index e95872401a0..729eecd8cea 100644 --- a/metaflow/mflog/save_logs.py +++ b/metaflow/mflog/save_logs.py @@ -39,7 +39,7 @@ def print_clean(line, **kwargs): # Use inferred attempt - to save task_stdout.log and task_stderr.log latest_done_attempt = flow_datastore.get_latest_done_attempt(run_id=run_id, step_name=step_name, task_id=task_id) task_datastore = flow_datastore.get_task_datastore( - run_id, step_name, task_id, int(latest_done_attempt), mode="w" + run_id, step_name, task_id, latest_done_attempt, mode="w" ) try: From 10ec80fa81f5a9996bab3212cc900f9022e91a1b Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Mon, 17 Feb 2025 15:38:40 +0100 Subject: [PATCH 27/34] set default to None --- metaflow/cli_components/step_cmd.py | 5 ++--- metaflow/datastore/flow_datastore.py | 5 +++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index fe5e4bf0444..df6b879c097 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -169,9 +169,8 @@ def step( step_name=step_name, task_id=task_id ) - retry_count = 0 - if not latest_done_attempt == 0: - retry_count += latest_done_attempt + if latest_done_attempt: + retry_count = latest_done_attempt + 1 echo_always(f"{latest_done_attempt=}") echo_always(f"{retry_count=}") # Not sure what are the side effects to this. diff --git a/metaflow/datastore/flow_datastore.py b/metaflow/datastore/flow_datastore.py index 053a047d000..d8beec6d9c7 100644 --- a/metaflow/datastore/flow_datastore.py +++ b/metaflow/datastore/flow_datastore.py @@ -1,5 +1,6 @@ import itertools import json +from typing import Optional from .. import metaflow_config @@ -67,12 +68,12 @@ def __init__( def datastore_root(self): return self._storage_impl.datastore_root - def get_latest_done_attempt(self, run_id, step_name, task_id) -> int: + def get_latest_done_attempt(self, run_id, step_name, task_id) -> Optional[int]: t_datastores = self.get_task_datastores( pathspecs=[f"{run_id}/{step_name}/{task_id}"], include_prior=True ) - return max([t.attempt for t in t_datastores], default=0) # returns default, if this was a first attempt. + return max([t.attempt for t in t_datastores], default=None) # returns default, if this was a first attempt. def get_task_datastores( self, From e815cf693db48ed0f4b55e45e4d8184a601ef5ef Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Mon, 17 Feb 2025 16:40:39 +0100 Subject: [PATCH 28/34] read from conf --- metaflow/metaflow_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index 415a934cbe4..9bb44d2b871 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -507,7 +507,7 @@ # # Note also that DataStoreSet resolves the latest attempt_id using # lexicographic ordering of attempts. This won't work if MAX_ATTEMPTS > 99. -MAX_ATTEMPTS = 6 +MAX_ATTEMPTS = from_conf("MAX_ATTEMPTS", 6) # Feature flag (experimental features that are *explicitly* unsupported) From 8959ae5466ec5c27ee8172a1d2acdd835c317b4d Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Tue, 18 Feb 2025 10:08:33 +0100 Subject: [PATCH 29/34] move get latest done attempts before if-else --- metaflow/cli_components/step_cmd.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index df6b879c097..982d2e6c718 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -102,7 +102,7 @@ def step( input_paths_filename=None, split_index=None, opt_namespace=None, - retry_count=None, + retry_count=None, # TODO remove this from interface, look for effecting codepaths max_user_code_retries=None, clone_only=None, clone_run_id=None, @@ -155,6 +155,15 @@ def step( ctx.obj.monitor, ubf_context, ) + latest_done_attempt = task.flow_datastore.get_latest_done_attempt( + run_id=run_id, step_name=step_name, task_id=task_id + ) + if latest_done_attempt: + retry_count = latest_done_attempt + 1 + # Not sure what are the side effects to this. + if retry_count >= max_user_code_retries: + max_user_code_retries = retry_count + if clone_only: task.clone_only( step_name, @@ -164,18 +173,6 @@ def step( retry_count, ) else: - latest_done_attempt = task.flow_datastore.get_latest_done_attempt( - run_id=run_id, - step_name=step_name, - task_id=task_id - ) - if latest_done_attempt: - retry_count = latest_done_attempt + 1 - echo_always(f"{latest_done_attempt=}") - echo_always(f"{retry_count=}") - # Not sure what are the side effects to this. - if retry_count >= max_user_code_retries: - max_user_code_retries = retry_count task.run_step( step_name, run_id, From 71770640cfb1a7a8b4a340dc4dfcb5bda1fc22d4 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Wed, 19 Feb 2025 11:29:34 +0100 Subject: [PATCH 30/34] mount MAX_ATTEMPTS on kubernetes job --- metaflow/plugins/kubernetes/kubernetes.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index 6625047395a..da54319f77d 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -39,6 +39,7 @@ SERVICE_HEADERS, KUBERNETES_SECRETS, SERVICE_INTERNAL_URL, + MAX_ATTEMPTS, ) from metaflow.unbounded_foreach import UBF_CONTROL, UBF_TASK from metaflow.metaflow_config_funcs import config_values @@ -299,6 +300,7 @@ def create_jobset( # assumes metadata is stored in DATASTORE_LOCAL_DIR on the Kubernetes # pod; this happens when METAFLOW_DATASTORE_SYSROOT_LOCAL is NOT set ( # see get_datastore_root_from_config in datastore/local.py). + .environment_variable("METAFLOW_MAX_ATTEMPTS", MAX_ATTEMPTS) ) for k in list( From 9e2faed509e537561cb9efd94ed16dbc2de25265 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Wed, 19 Feb 2025 13:14:56 +0100 Subject: [PATCH 31/34] Revert "mount MAX_ATTEMPTS on kubernetes job" This reverts commit 71770640cfb1a7a8b4a340dc4dfcb5bda1fc22d4. --- metaflow/plugins/kubernetes/kubernetes.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index da54319f77d..6625047395a 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -39,7 +39,6 @@ SERVICE_HEADERS, KUBERNETES_SECRETS, SERVICE_INTERNAL_URL, - MAX_ATTEMPTS, ) from metaflow.unbounded_foreach import UBF_CONTROL, UBF_TASK from metaflow.metaflow_config_funcs import config_values @@ -300,7 +299,6 @@ def create_jobset( # assumes metadata is stored in DATASTORE_LOCAL_DIR on the Kubernetes # pod; this happens when METAFLOW_DATASTORE_SYSROOT_LOCAL is NOT set ( # see get_datastore_root_from_config in datastore/local.py). - .environment_variable("METAFLOW_MAX_ATTEMPTS", MAX_ATTEMPTS) ) for k in list( From 0f135b686b5526f9657e111a96920d38a5e1ff3b Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Wed, 19 Feb 2025 13:39:33 +0100 Subject: [PATCH 32/34] Revert "Revert "mount MAX_ATTEMPTS on kubernetes job"" This reverts commit 9e2faed509e537561cb9efd94ed16dbc2de25265. --- metaflow/plugins/kubernetes/kubernetes.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index 6625047395a..da54319f77d 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -39,6 +39,7 @@ SERVICE_HEADERS, KUBERNETES_SECRETS, SERVICE_INTERNAL_URL, + MAX_ATTEMPTS, ) from metaflow.unbounded_foreach import UBF_CONTROL, UBF_TASK from metaflow.metaflow_config_funcs import config_values @@ -299,6 +300,7 @@ def create_jobset( # assumes metadata is stored in DATASTORE_LOCAL_DIR on the Kubernetes # pod; this happens when METAFLOW_DATASTORE_SYSROOT_LOCAL is NOT set ( # see get_datastore_root_from_config in datastore/local.py). + .environment_variable("METAFLOW_MAX_ATTEMPTS", MAX_ATTEMPTS) ) for k in list( From 55126a1beb7728a578248e92b5f3f785e8e7eb64 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Wed, 19 Feb 2025 14:26:15 +0100 Subject: [PATCH 33/34] add max attempts envar in container template --- metaflow/plugins/argo/argo_workflows.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 336d880da0c..d9978c66d72 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -46,6 +46,7 @@ SERVICE_HEADERS, SERVICE_INTERNAL_URL, UI_URL, + MAX_ATTEMPTS, ) from metaflow.metaflow_config_funcs import config_values from metaflow.mflog import BASH_SAVE_LOGS, bash_capture_logs, export_mflog_env_vars @@ -1733,6 +1734,7 @@ def _container_templates(self): "METAFLOW_KUBERNETES_FETCH_EC2_METADATA": KUBERNETES_FETCH_EC2_METADATA, "METAFLOW_RUNTIME_ENVIRONMENT": "kubernetes", "METAFLOW_OWNER": self.username, + "METAFLOW_MAX_ATTEMPTS": MAX_ATTEMPTS, }, **{ # Configuration for Argo Events. Keep these in sync with the From 0fd14d32a705db1586f18f2fb637e6632b9a96f9 Mon Sep 17 00:00:00 2001 From: dhpikolo Date: Wed, 19 Feb 2025 14:26:58 +0100 Subject: [PATCH 34/34] add in k8s create job --- metaflow/plugins/kubernetes/kubernetes.py | 1 + 1 file changed, 1 insertion(+) diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index da54319f77d..71eb7d79cdc 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -604,6 +604,7 @@ def create_job_object( # assumes metadata is stored in DATASTORE_LOCAL_DIR on the Kubernetes # pod; this happens when METAFLOW_DATASTORE_SYSROOT_LOCAL is NOT set ( # see get_datastore_root_from_config in datastore/local.py). + .environment_variable("METAFLOW_MAX_ATTEMPTS", MAX_ATTEMPTS) ) # Temporary passing of *some* environment variables. Do not rely on this