diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/EvalOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/EvalOp.groovy new file mode 100644 index 0000000000..09e1f2500f --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/extension/EvalOp.groovy @@ -0,0 +1,62 @@ +/* + * Copyright 2020-2022, Seqera Labs + * Copyright 2013-2019, Centre for Genomic Regulation (CRG) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.extension + +import groovy.transform.CompileStatic +import groovyx.gpars.dataflow.DataflowReadChannel +import groovyx.gpars.dataflow.DataflowWriteChannel +import nextflow.script.ChannelOut + +/** + * Implements the {@link OperatorImpl#eval} operator + * + * @author Jacob Munro + */ +@CompileStatic +class EvalOp { + + private Object source + + private Closure closure + + EvalOp( ChannelOut source, Closure closure ) { + assert source != null + assert closure != null + + this.source = source + this.closure = closure + } + + EvalOp( DataflowReadChannel source, Closure closure ) { + assert source != null + assert closure != null + + this.source = source as DataflowWriteChannel + this.closure = closure + } + + Object apply() { + + final copy = (Closure)closure.clone() + copy.setResolveStrategy(Closure.DELEGATE_FIRST) + def result = source.with(copy) + + return result + } + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy new file mode 100644 index 0000000000..a5464aa4b6 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy @@ -0,0 +1,61 @@ +/* + * Copyright 2020-2022, Seqera Labs + * Copyright 2013-2019, Centre for Genomic Regulation (CRG) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.extension + +import groovy.transform.CompileStatic +import groovyx.gpars.dataflow.DataflowReadChannel +import nextflow.script.ProcessDef +import nextflow.script.ChannelOut + +/** + * Implements the {@link OperatorImpl#exec} operator + * + * @author Jacob Munro + */ +@CompileStatic +class ExecOp { + + private ProcessDef processDef + + private DataflowReadChannel source + + private Object[] args + + ExecOp( DataflowReadChannel source, ProcessDef processDef, Object[] args ) { + assert processDef != null + assert source != null + + this.source = source + this.processDef = processDef + this.args = args + } + + Object apply() { + + def result = processDef.run(resolveInputs()) + + return result + } + + private Object[] resolveInputs() { + + Object[] resultArray = ([source] + (args as List)).toArray() + + return ChannelOut.spread(resultArray).toArray() + } +} diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OpCall.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OpCall.groovy index 8b7f9cf41c..f85d4c4189 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/OpCall.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/OpCall.groovy @@ -27,6 +27,10 @@ class OpCall implements Callable { final static private List SPECIAL_NAMES = ["choice","merge","separate"] final static private String SET_OP_hack = 'set' + + final static private String EVAL_OP_hack = 'eval' + + final static private String EXEC_OP_hack = 'exec' static ThreadLocal current = new ThreadLocal<>() @@ -78,8 +82,20 @@ class OpCall implements Callable { return this } - if( args.size() ) + if( methodName == EVAL_OP_hack ) { + source = left[0] as DataflowWriteChannel + args = ([left] as Object[]) + args + return this + } + + if( args.size() ) { + if( methodName == EXEC_OP_hack ) { + source = left[0] as DataflowWriteChannel + args = (left[1..-1] as Object[]) + args + return this + } throw new ScriptRuntimeException("Multi-channel output cannot be applied to operator ${methodName} for which argument is already provided") + } source = left[0] as DataflowWriteChannel args = left[1..-1] as Object[] diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy index c1602a9569..c03f475eac 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy @@ -40,6 +40,7 @@ import nextflow.Session import nextflow.script.ChannelOut import nextflow.script.TokenBranchDef import nextflow.script.TokenMultiMapDef +import nextflow.script.ProcessDef import nextflow.splitter.FastaSplitter import nextflow.splitter.FastqSplitter import nextflow.splitter.TextSplitter @@ -1493,4 +1494,32 @@ class OperatorImpl { log.warn "Operator `fork` has been renamed to `multiMap`" multiMap(source, action) } + + /** + * Implement `exec` operator + */ + Object exec(DataflowReadChannel source, ProcessDef processDef, Object... args) { + // don't set DAG Node + OpCall.current.get().ignoreDagNode = true + + return new ExecOp(source, processDef, args).apply() + } + + /* + * Implement `eval` operator + */ + Object eval(DataflowReadChannel source, Closure closure) { + // don't set DAG Node + OpCall.current.get().ignoreDagNode = true + + return new EvalOp(source, closure).apply() + } + + Object eval(DataflowReadChannel source, ChannelOut channelOut, Closure closure) { + // don't set DAG Node + OpCall.current.get().ignoreDagNode = true + + return new EvalOp(channelOut, closure).apply() + } + }