55
55
"null" : None ,
56
56
}
57
57
58
- SCATTER_JOB_PATTERN = re .compile (r"^(.+)_\d+$" )
58
+ SCATTER_JOB_PATTERN = re .compile (r"^(.+)_( \d+) $" )
59
59
60
60
CWLPROV_NONE = "https://w3id.org/cwl/prov#None"
61
61
@@ -215,6 +215,7 @@ def __init__(self, root, workflow_name=None, license=None, readme=None,
215
215
self .file_map = {}
216
216
self .manifest = self ._get_manifest ()
217
217
self .remap_names = remap_names
218
+ self .data_root = "data"
218
219
219
220
@staticmethod
220
221
def _get_step_maps (cwl_defs ):
@@ -240,11 +241,13 @@ def _get_manifest(self):
240
241
def _resolve_plan (self , activity ):
241
242
job_qname = activity .plan ()
242
243
plan = activity .provenance .entity (job_qname )
244
+ scatter_id = None
243
245
if not plan :
244
246
m = SCATTER_JOB_PATTERN .match (str (job_qname ))
245
247
if m :
246
248
plan = activity .provenance .entity (m .groups ()[0 ])
247
- return plan
249
+ scatter_id = m .groups ()[1 ]
250
+ return plan , scatter_id
248
251
249
252
def _get_hash (self , prov_param ):
250
253
k = prov_param .id .localpart
@@ -463,9 +466,11 @@ def add_action(self, crate, activity, parent_instrument=None):
463
466
"@type" : "CreateAction" ,
464
467
"name" : activity .label ,
465
468
}))
466
- plan = self ._resolve_plan (activity )
469
+ plan , scatter_id = self ._resolve_plan (activity )
467
470
plan_tag = plan .id .localpart
471
+ dest_base = Path (self .data_root )
468
472
if plan_tag == "main" :
473
+ dest_base = dest_base / "main"
469
474
assert str (activity .type ) == "wfprov:WorkflowRun"
470
475
instrument = workflow
471
476
self .roc_engine_run ["result" ] = action
@@ -480,6 +485,7 @@ def to_wf_p(k):
480
485
if parts [0 ] == "main" :
481
486
parts [0 ] = parent_instrument_fragment
482
487
plan_tag = "/" .join (parts )
488
+ dest_base = dest_base / (f"{ plan_tag } _{ scatter_id } " if scatter_id else f"{ plan_tag } " )
483
489
tool_name = self .step_maps [parent_instrument_fragment ][plan_tag ]["tool" ]
484
490
instrument = crate .dereference (f"{ workflow .id } #{ tool_name } " )
485
491
control_action = self .control_actions .get (plan_tag )
@@ -503,12 +509,14 @@ def to_wf_p(k):
503
509
action ["instrument" ] = instrument
504
510
action ["startTime" ] = activity .start ().time .isoformat ()
505
511
action ["endTime" ] = activity .end ().time .isoformat ()
506
- action ["object" ] = self .add_action_params (crate , activity , to_wf_p , "usage" )
507
- action ["result" ] = self .add_action_params (crate , activity , to_wf_p , "generation" )
512
+ action ["object" ] = self .add_action_params (crate , activity , to_wf_p , "usage" ,
513
+ dest_base / "in" if self .remap_names else "" )
514
+ action ["result" ] = self .add_action_params (crate , activity , to_wf_p , "generation" ,
515
+ dest_base / "out" if self .remap_names else "" )
508
516
for job in activity .steps ():
509
517
self .add_action (crate , job , parent_instrument = instrument )
510
518
511
- def add_action_params (self , crate , activity , to_wf_p , ptype = "usage" ):
519
+ def add_action_params (self , crate , activity , to_wf_p , ptype = "usage" , dest_base = "" ):
512
520
action_params = []
513
521
all_roles = set ()
514
522
for rel in getattr (activity , ptype )():
@@ -528,7 +536,7 @@ def add_action_params(self, crate, activity, to_wf_p, ptype="usage"):
528
536
wf_p = crate .dereference (to_wf_p (k ))
529
537
k = get_fragment (k )
530
538
v = rel .entity ()
531
- value = self .convert_param (v , crate )
539
+ value = self .convert_param (v , crate , dest_base = dest_base )
532
540
if value is None :
533
541
continue # param is optional with no default and was not set
534
542
if {"ro:Folder" , "wf4ever:File" } & set (str (_ ) for _ in v .types ()):
@@ -565,7 +573,7 @@ def _set_alternate_name(prov_param, action_p, parent=None):
565
573
if "alternateName" in parent :
566
574
action_p ["alternateName" ] = (Path (parent ["alternateName" ]) / basename ).as_posix ()
567
575
568
- def convert_param (self , prov_param , crate , convert_secondary = True , parent = None ):
576
+ def convert_param (self , prov_param , crate , convert_secondary = True , parent = None , dest_base = "" ):
569
577
type_names = frozenset (str (_ ) for _ in prov_param .types ())
570
578
secondary_files = [_ .generated_entity () for _ in prov_param .derivations ()
571
579
if str (_ .type ) == "cwlprov:SecondaryFile" ]
@@ -589,7 +597,7 @@ def convert_param(self, prov_param, crate, convert_secondary=True, parent=None):
589
597
basename = getattr (prov_param , "basename" , hash_ )
590
598
else :
591
599
basename = hash_
592
- dest = Path (parent .id if parent else "" ) / basename
600
+ dest = Path (parent .id if parent else dest_base ) / basename
593
601
action_p = crate .dereference (dest .as_posix ())
594
602
if not action_p :
595
603
source = self .manifest [hash_ ]
@@ -610,7 +618,7 @@ def convert_param(self, prov_param, crate, convert_secondary=True, parent=None):
610
618
basename = getattr (prov_param , "basename" , hash_ )
611
619
else :
612
620
basename = hash_
613
- dest = Path (parent .id if parent else "" ) / basename
621
+ dest = Path (parent .id if parent else dest_base ) / basename
614
622
action_p = crate .dereference (dest .as_posix ())
615
623
if not action_p :
616
624
action_p = crate .add_directory (dest_path = dest )
0 commit comments