Skip to content

Commit 555abb5

Browse files
author
David Marchant
committed
tweak to workflow report to link by parents, to make identifying loops of jobs easier
git-svn-id: svn+ssh://svn.code.sf.net/p/migrid/code/trunk@5094 b75ad72c-e7d7-11dd-a971-7dbc132099af
1 parent 05c2afb commit 555abb5

File tree

1 file changed

+34
-23
lines changed

1 file changed

+34
-23
lines changed

mig/shared/workflows.py

+34-23
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ def get_workflow_job_report(configuration, vgrid):
234234
return (False, feedback)
235235

236236
workflow_history = {}
237-
inputs = {}
237+
outputs = {}
238238
for (_, _, files) in os.walk(history_home):
239239
for filename in files:
240240
job_history_path = os.path.join(history_home, filename)
@@ -258,7 +258,6 @@ def get_workflow_job_report(configuration, vgrid):
258258
job_history['children'] = []
259259

260260
job_id = job_history['job_id']
261-
start = job_history['start']
262261
trigger_path = job_history['trigger_path']
263262

264263
if trigger_path.startswith(configuration.vgrid_files_home):
@@ -267,30 +266,35 @@ def get_workflow_job_report(configuration, vgrid):
267266
if trigger_path.startswith(configuration.vgrid_files_writable):
268267
trigger_path = trigger_path[
269268
len(configuration.vgrid_files_writable):]
269+
# Hide internal structure of the mig from the report
270+
job_history['trigger_path'] = trigger_path
270271

271272
workflow_history[job_id] = job_history
272-
if trigger_path in inputs:
273-
inputs[trigger_path].append((job_id, start))
274-
inputs[trigger_path].sort(key=lambda x: x[1])
275-
else:
276-
inputs[trigger_path] = [(job_id, start)]
273+
274+
for write_path, write_time in job_history['write']:
275+
if write_path in outputs:
276+
outputs[write_path].append((job_id, write_time))
277+
outputs[write_path].sort(key=lambda x: x[1], reverse=True)
278+
else:
279+
outputs[write_path] = [(job_id, write_time)]
277280

278281
for job_id, entry in workflow_history.items():
279-
for write_path, write_time in entry['write']:
280-
if write_path not in inputs:
281-
continue
282-
possible_links = inputs[write_path]
283-
try:
284-
# Find job input that occurred first after our write.
285-
i = next(x[0] for x in enumerate(possible_links) if
286-
x[1][1] > write_time)
287-
child_job_id = possible_links[i][0]
288-
except StopIteration:
289-
continue
290-
if child_job_id not in entry['children']:
291-
entry['children'].append(child_job_id)
292-
child_job = workflow_history[child_job_id]
293-
child_job['parents'].append(job_id)
282+
trigger_path = entry['trigger_path']
283+
start_time = entry['start']
284+
if trigger_path not in outputs:
285+
continue
286+
possible_links = outputs[trigger_path]
287+
try:
288+
# Find job output that occurred last before our start.
289+
i = next(x[0] for x in enumerate(possible_links) if
290+
x[1][1] < start_time)
291+
parent_job_id = possible_links[i][0]
292+
except StopIteration:
293+
continue
294+
if parent_job_id not in entry['parents']:
295+
entry['parents'].append(parent_job_id)
296+
parent_job = workflow_history[parent_job_id]
297+
parent_job['children'].append(job_id)
294298

295299
return (True, workflow_history)
296300

@@ -3381,6 +3385,13 @@ def __search_workflow_p_graph(configuration, vgrid):
33813385
reset_workflows(conf, default_vgrid)
33823386
if args[0] == 'job_report':
33833387
if len(args) > 1 and args[1]:
3384-
get_workflow_job_report(conf, args[1])
3388+
status, report = get_workflow_job_report(conf, args[1])
3389+
print(status)
3390+
for job_id, job in report.items():
3391+
print(job_id)
3392+
print(' parents:')
3393+
print(' %s' % job['parents'])
3394+
print(' children:')
3395+
print(' %s' % job['children'])
33853396
else:
33863397
print('job report requires vgrid')

0 commit comments

Comments
 (0)