Skip to content

exec and eval operators #3356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/extension/EvalOp.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2020-2022, Seqera Labs
* Copyright 2013-2019, Centre for Genomic Regulation (CRG)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.extension

import groovy.transform.CompileStatic
import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.script.ChannelOut

/**
* Implements the {@link OperatorImpl#eval} operator
*
* @author Jacob Munro <[email protected]>
*/
@CompileStatic
class EvalOp {

private Object source

private Closure closure

EvalOp( ChannelOut source, Closure closure ) {
assert source != null
assert closure != null

this.source = source
this.closure = closure
}

EvalOp( DataflowReadChannel source, Closure closure ) {
assert source != null
assert closure != null

this.source = source as DataflowWriteChannel
this.closure = closure
}

Object apply() {

final copy = (Closure)closure.clone()
copy.setResolveStrategy(Closure.DELEGATE_FIRST)
def result = source.with(copy)

return result
}

}
61 changes: 61 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/extension/ExecOp.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2020-2022, Seqera Labs
* Copyright 2013-2019, Centre for Genomic Regulation (CRG)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.extension

import groovy.transform.CompileStatic
import groovyx.gpars.dataflow.DataflowReadChannel
import nextflow.script.ProcessDef
import nextflow.script.ChannelOut

/**
* Implements the {@link OperatorImpl#exec} operator
*
* @author Jacob Munro <[email protected]>
*/
@CompileStatic
class ExecOp {

private ProcessDef processDef

private DataflowReadChannel source

private Object[] args

ExecOp( DataflowReadChannel source, ProcessDef processDef, Object[] args ) {
assert processDef != null
assert source != null

this.source = source
this.processDef = processDef
this.args = args
}

Object apply() {

def result = processDef.run(resolveInputs())

return result
}

private Object[] resolveInputs() {

Object[] resultArray = ([source] + (args as List)).toArray()

return ChannelOut.spread(resultArray).toArray()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ class OpCall implements Callable {
final static private List<String> SPECIAL_NAMES = ["choice","merge","separate"]

final static private String SET_OP_hack = 'set'

final static private String EVAL_OP_hack = 'eval'

final static private String EXEC_OP_hack = 'exec'

static ThreadLocal<OpCall> current = new ThreadLocal<>()

Expand Down Expand Up @@ -78,8 +82,20 @@ class OpCall implements Callable {
return this
}

if( args.size() )
if( methodName == EVAL_OP_hack ) {
source = left[0] as DataflowWriteChannel
args = ([left] as Object[]) + args
return this
}

if( args.size() ) {
if( methodName == EXEC_OP_hack ) {
source = left[0] as DataflowWriteChannel
args = (left[1..-1] as Object[]) + args
return this
}
throw new ScriptRuntimeException("Multi-channel output cannot be applied to operator ${methodName} for which argument is already provided")
}

source = left[0] as DataflowWriteChannel
args = left[1..-1] as Object[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import nextflow.Session
import nextflow.script.ChannelOut
import nextflow.script.TokenBranchDef
import nextflow.script.TokenMultiMapDef
import nextflow.script.ProcessDef
import nextflow.splitter.FastaSplitter
import nextflow.splitter.FastqSplitter
import nextflow.splitter.TextSplitter
Expand Down Expand Up @@ -1493,4 +1494,32 @@ class OperatorImpl {
log.warn "Operator `fork` has been renamed to `multiMap`"
multiMap(source, action)
}

/**
* Implement `exec` operator
*/
Object exec(DataflowReadChannel source, ProcessDef processDef, Object... args) {
// don't set DAG Node
OpCall.current.get().ignoreDagNode = true

return new ExecOp(source, processDef, args).apply()
}

/*
* Implement `eval` operator
*/
Object eval(DataflowReadChannel source, Closure closure) {
// don't set DAG Node
OpCall.current.get().ignoreDagNode = true

return new EvalOp(source, closure).apply()
}

Object eval(DataflowReadChannel source, ChannelOut channelOut, Closure closure) {
// don't set DAG Node
OpCall.current.get().ignoreDagNode = true

return new EvalOp(channelOut, closure).apply()
}

}