From d92f7a928697092a25eb02d69aba8458ad39e41d Mon Sep 17 00:00:00 2001 From: jemunro Date: Tue, 22 Nov 2022 22:20:39 +1100 Subject: [PATCH 1/5] enable flexible process piping with InputPlaceholder Signed-off-by: jemunro --- .../groovy/nextflow/script/BindableDef.groovy | 8 ++- .../nextflow/script/InputPlaceholder.groovy | 68 +++++++++++++++++++ .../groovy/nextflow/script/ProcessDef.groovy | 53 +++++++++++---- .../nextflow/script/ScriptBinding.groovy | 10 ++- 4 files changed, 124 insertions(+), 15 deletions(-) create mode 100644 modules/nextflow/src/main/groovy/nextflow/script/InputPlaceholder.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/script/BindableDef.groovy b/modules/nextflow/src/main/groovy/nextflow/script/BindableDef.groovy index 97361e28c1..f6ce3b7313 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/BindableDef.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/BindableDef.groovy @@ -38,9 +38,14 @@ abstract class BindableDef extends ComponentDef { Object invoke_a(Object[] args) { + if (this instanceof ProcessDef && (this as ProcessDef).isPartial) { + return this.run(args) + } + // use this instance an workflow template, therefore clone it final String prefix = ExecutionStack.workflow()?.name final fqName = prefix ? prefix+SCOPE_SEP+name : name + if( this instanceof ProcessDef && !invocations.add(fqName) ) { log.debug "Bindable invocations=$invocations" final msg = "Process '$name' has been already used -- If you need to reuse the same component, include it with a different name or include it in a different workflow context" @@ -49,7 +54,7 @@ abstract class BindableDef extends ComponentDef { final comp = (prefix ? this.cloneWithName(fqName) : this.clone()) as BindableDef // invoke the process execution - final result = comp.run(args) + Object result = comp.run(args) // register this component invocation in the current context // so that it can be accessed in the outer execution scope @@ -61,5 +66,4 @@ abstract class BindableDef extends ComponentDef { } return result } - } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/InputPlaceholder.groovy b/modules/nextflow/src/main/groovy/nextflow/script/InputPlaceholder.groovy new file mode 100644 index 0000000000..559d3f6ce4 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/script/InputPlaceholder.groovy @@ -0,0 +1,68 @@ +/* + * Copyright 2020-2022, Seqera Labs + * Copyright 2013-2019, Centre for Genomic Regulation (CRG) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.script + +import groovy.transform.CompileStatic + +/** + * Placeholder for process inputs + * + * @author Jacob E Munro + */ + +@CompileStatic +class InputPlaceholder { + + private Integer index + + private String name + + InputPlaceholder() { } + + InputPlaceholder (Integer index) { + this.index = index + } + + InputPlaceholder (String name) { + this.name = name + } + + InputPlaceholder getAt(Integer i) { + new InputPlaceholder(i) + } + + InputPlaceholder getAt(String name) { + new InputPlaceholder(name) + } + + @Override + Object getProperty(String name) { + this.hasProperty(name) ? this.getProperties()[name] : new InputPlaceholder(name) + } + + Object getInput(Object object) { + + if (object instanceof ChannelOut) { + ChannelOut channelOut = object as ChannelOut + return name ? channelOut.getProperty(name) : index ? channelOut[index] : channelOut[0] + } + return object + } + + String getName() { name } +} \ No newline at end of file diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy index 7fc8539944..d7a1d362c6 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy @@ -83,6 +83,11 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef { */ private transient ChannelOut output + /** + * Whether the arguments are partially filled due to InputPlaceholder + */ + private boolean isPartial = false + ProcessDef(BaseScript owner, Closure body, String name ) { this.owner = owner this.rawBody = body @@ -165,18 +170,42 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef { @Override Object run(Object[] args) { - // initialise process config - initialize() - - // get params - final params = ChannelOut.spread(args) - // sanity check - if( params.size() != declaredInputs.size() ) - throw new ScriptRuntimeException(missMatchErrMessage(processName, declaredInputs.size(), params.size())) - - // set input channels - for( int i=0; i Date: Thu, 16 Nov 2023 11:30:19 -0600 Subject: [PATCH 2/5] Refactor using closure syntax Signed-off-by: Ben Sherman --- .../nextflow/extension/ChannelEx.groovy | 96 +++++++++++++------ .../groovy/nextflow/script/BindableDef.groovy | 8 +- .../nextflow/script/InputPlaceholder.groovy | 68 ------------- .../groovy/nextflow/script/ProcessDef.groovy | 53 +++------- .../nextflow/script/ScriptBinding.groovy | 10 +- 5 files changed, 84 insertions(+), 151 deletions(-) delete mode 100644 modules/nextflow/src/main/groovy/nextflow/script/InputPlaceholder.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy index 99a58da28b..b3467abc8e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy @@ -122,11 +122,10 @@ class ChannelEx { } /** - * Implements pipe operation between a channel and a process or a sub-workflow + * Pipe a channel INTO a process or workflow. * - * @param left A dataflow channel instance - * @param right A {@link ChainableDef} object eg. a nextflow process - * @return The channel resulting the pipe operation + * @param left + * @param right */ static Object or(DataflowWriteChannel left, ChainableDef right) { checkContext('or', right) @@ -134,11 +133,10 @@ class ChannelEx { } /** - * Implements pipe operation between a channel WITH a operator + * Pipe a channel INTO an operator. * - * @param left A {@code DataflowWriteChannel} channel as left operand - * @param right A {@code OpCall} object representing a operator call as right operand - * @return The resulting channel object + * @param left + * @param right */ static Object or(DataflowWriteChannel left, OpCall right) { checkContext('or', right) @@ -146,11 +144,24 @@ class ChannelEx { } /** - * Implements pipe operation between a multi-channels WITH a process or a sub-workflow + * Pipe a channel INTO a closure that defines a custom + * invocation of a process, workflow, or operator. * - * @param left A {@code ChannelOut} multi-channel object as left operand - * @param right A {@code ChainableDef} object representing a process or sub-workflow call as right operand - * @return The resulting channel object + * @param left + * @param right + */ + static Object or(DataflowWriteChannel left, Closure right) { + def out = right.call(left) + if( out instanceof DataflowWriteChannel || out instanceof ChannelOut ) + return out + throw new ScriptRuntimeException("Closure component did not return a channel") + } + + /** + * Pipe a multi-channel INTO a process or workflow. + * + * @param left + * @param right */ static Object or(ChannelOut left, ChainableDef right) { checkContext('or', right) @@ -158,11 +169,10 @@ class ChannelEx { } /** - * Implements pipe operation between a multi-channels WITH a operator + * Pipe a multi-channel INTO an operator. * - * @param left A {@code ChannelOut} multi-channel object as left operand - * @param right A {@code OpCall} object representing a operator call as right operand - * @return The resulting channel object + * @param left + * @param right */ static Object or(ChannelOut left, OpCall right) { checkContext('or', right) @@ -170,14 +180,29 @@ class ChannelEx { } /** - * Implements pipe operation between a process or sub-workflow WITH a operator + * Pipe a multi-channel INTO a closure that defines a custom + * invocation of a process, workflow, or operator. * - * @param left A {@code ChainableDef} object representing a process or sub-workflow call as left operand - * @param right A {@code OpCall} object representing a operator call as right operand - * @return The resulting channel object + * @param left + * @param right */ - static Object or(ChainableDef left, OpCall right) { + static Object or(ChannelOut left, Closure right) { + def out = right.call(left) + if( out instanceof DataflowWriteChannel || out instanceof ChannelOut ) + return out + throw new ScriptRuntimeException("Closure component did not return a channel") + } + + /** + * Pipe a process or workflow INTO another process or workflow. + * + * @param left + * @param right + */ + static Object or(ChainableDef left, ChainableDef right) { checkContext('or', left) + checkContext('or', right) + def out = left.invoke_a(InvokerHelper.EMPTY_ARGS) if( out instanceof DataflowWriteChannel ) @@ -190,16 +215,33 @@ class ChannelEx { } /** - * Implements pipe operation between a process or sub-workflow WITH another process or sub-workflow + * Pipe a process or workflow INTO an operator. * - * @param left A {@code ChainableDef} object representing a process or sub-workflow call as left operand - * @param right A {@code ChainableDef} object representing a process or sub-workflow call as right operand - * @return + * @param left + * @param right */ - static Object or(ChainableDef left, ChainableDef right) { + static Object or(ChainableDef left, OpCall right) { checkContext('or', left) - checkContext('or', right) + def out = left.invoke_a(InvokerHelper.EMPTY_ARGS) + + if( out instanceof DataflowWriteChannel ) + return or((DataflowWriteChannel)out, right) + + if( out instanceof ChannelOut ) + return or((ChannelOut)out, right) + + throw new ScriptRuntimeException("Cannot pipe ${fmtType(out)} with ${fmtType(right)}") + } + /** + * Pipe a process or workflow INTO a closure that defines a custom + * invocation of a process, workflow, or operator. + * + * @param left + * @param right + */ + static Object or(ChainableDef left, Closure right) { + checkContext('or', left) def out = left.invoke_a(InvokerHelper.EMPTY_ARGS) if( out instanceof DataflowWriteChannel ) diff --git a/modules/nextflow/src/main/groovy/nextflow/script/BindableDef.groovy b/modules/nextflow/src/main/groovy/nextflow/script/BindableDef.groovy index f6ce3b7313..97361e28c1 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/BindableDef.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/BindableDef.groovy @@ -38,14 +38,9 @@ abstract class BindableDef extends ComponentDef { Object invoke_a(Object[] args) { - if (this instanceof ProcessDef && (this as ProcessDef).isPartial) { - return this.run(args) - } - // use this instance an workflow template, therefore clone it final String prefix = ExecutionStack.workflow()?.name final fqName = prefix ? prefix+SCOPE_SEP+name : name - if( this instanceof ProcessDef && !invocations.add(fqName) ) { log.debug "Bindable invocations=$invocations" final msg = "Process '$name' has been already used -- If you need to reuse the same component, include it with a different name or include it in a different workflow context" @@ -54,7 +49,7 @@ abstract class BindableDef extends ComponentDef { final comp = (prefix ? this.cloneWithName(fqName) : this.clone()) as BindableDef // invoke the process execution - Object result = comp.run(args) + final result = comp.run(args) // register this component invocation in the current context // so that it can be accessed in the outer execution scope @@ -66,4 +61,5 @@ abstract class BindableDef extends ComponentDef { } return result } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/InputPlaceholder.groovy b/modules/nextflow/src/main/groovy/nextflow/script/InputPlaceholder.groovy deleted file mode 100644 index 559d3f6ce4..0000000000 --- a/modules/nextflow/src/main/groovy/nextflow/script/InputPlaceholder.groovy +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2020-2022, Seqera Labs - * Copyright 2013-2019, Centre for Genomic Regulation (CRG) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package nextflow.script - -import groovy.transform.CompileStatic - -/** - * Placeholder for process inputs - * - * @author Jacob E Munro - */ - -@CompileStatic -class InputPlaceholder { - - private Integer index - - private String name - - InputPlaceholder() { } - - InputPlaceholder (Integer index) { - this.index = index - } - - InputPlaceholder (String name) { - this.name = name - } - - InputPlaceholder getAt(Integer i) { - new InputPlaceholder(i) - } - - InputPlaceholder getAt(String name) { - new InputPlaceholder(name) - } - - @Override - Object getProperty(String name) { - this.hasProperty(name) ? this.getProperties()[name] : new InputPlaceholder(name) - } - - Object getInput(Object object) { - - if (object instanceof ChannelOut) { - ChannelOut channelOut = object as ChannelOut - return name ? channelOut.getProperty(name) : index ? channelOut[index] : channelOut[0] - } - return object - } - - String getName() { name } -} \ No newline at end of file diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy index d7a1d362c6..7fc8539944 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy @@ -83,11 +83,6 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef { */ private transient ChannelOut output - /** - * Whether the arguments are partially filled due to InputPlaceholder - */ - private boolean isPartial = false - ProcessDef(BaseScript owner, Closure body, String name ) { this.owner = owner this.rawBody = body @@ -170,42 +165,18 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef { @Override Object run(Object[] args) { - - if (! isPartial ) { - initialize() - - // get params - final params = ChannelOut.spread(args) - // sanity check - if( params.size() != declaredInputs.size() ) - throw new ScriptRuntimeException(missMatchErrMessage(processName, declaredInputs.size(), params.size())) - - // set input channels - for( int i=0; i Date: Thu, 16 Nov 2023 12:09:08 -0600 Subject: [PATCH 3/5] Update docs Signed-off-by: Ben Sherman --- docs/snippets/pipe-with-closure.nf | 19 ++++++++++ docs/snippets/pipe-with-closure.out | 3 ++ docs/snippets/pipe.nf | 14 ++++++++ docs/snippets/pipe.out | 3 ++ docs/snippets/process-named-stdout.nf | 18 ++++++++++ docs/snippets/process-named-stdout.out | 3 ++ docs/workflow.md | 49 ++++++++------------------ 7 files changed, 74 insertions(+), 35 deletions(-) create mode 100644 docs/snippets/pipe-with-closure.nf create mode 100644 docs/snippets/pipe-with-closure.out create mode 100644 docs/snippets/pipe.nf create mode 100644 docs/snippets/pipe.out create mode 100644 docs/snippets/process-named-stdout.nf create mode 100644 docs/snippets/process-named-stdout.out diff --git a/docs/snippets/pipe-with-closure.nf b/docs/snippets/pipe-with-closure.nf new file mode 100644 index 0000000000..d5381965bf --- /dev/null +++ b/docs/snippets/pipe-with-closure.nf @@ -0,0 +1,19 @@ +process foo { + input: + val message + val suffix + + output: + val result, emit: suffixed + + exec: + result = "${message}${suffix}" +} + +workflow { + suffix = ' world!' + channel.from('Hello','Hola','Ciao') + | map { it.toUpperCase() } + | { _ -> foo(_, suffix) } + | view +} \ No newline at end of file diff --git a/docs/snippets/pipe-with-closure.out b/docs/snippets/pipe-with-closure.out new file mode 100644 index 0000000000..5220cf2707 --- /dev/null +++ b/docs/snippets/pipe-with-closure.out @@ -0,0 +1,3 @@ +HELLO world! +HOLA world! +CIAO world! \ No newline at end of file diff --git a/docs/snippets/pipe.nf b/docs/snippets/pipe.nf new file mode 100644 index 0000000000..4fac307250 --- /dev/null +++ b/docs/snippets/pipe.nf @@ -0,0 +1,14 @@ +process foo { + input: + val message + + output: + val result + + exec: + result = "$message world" +} + +workflow { + channel.from('Hello','Hola','Ciao') | foo | map { it.toUpperCase() } | view +} \ No newline at end of file diff --git a/docs/snippets/pipe.out b/docs/snippets/pipe.out new file mode 100644 index 0000000000..fee117ac47 --- /dev/null +++ b/docs/snippets/pipe.out @@ -0,0 +1,3 @@ +HELLO WORLD +HOLA WORLD +CIAO WORLD \ No newline at end of file diff --git a/docs/snippets/process-named-stdout.nf b/docs/snippets/process-named-stdout.nf new file mode 100644 index 0000000000..2cf99bcf72 --- /dev/null +++ b/docs/snippets/process-named-stdout.nf @@ -0,0 +1,18 @@ +process sayHello { + input: + val cheers + + output: + stdout emit: verbiage + + script: + """ + echo -n $cheers + """ +} + +workflow { + things = channel.of('Hello world!', 'Yo, dude!', 'Duck!') + sayHello(things) + sayHello.out.verbiage.view() +} \ No newline at end of file diff --git a/docs/snippets/process-named-stdout.out b/docs/snippets/process-named-stdout.out new file mode 100644 index 0000000000..ffd7b891ec --- /dev/null +++ b/docs/snippets/process-named-stdout.out @@ -0,0 +1,3 @@ +Hello world! +Yo, dude! +Duck! \ No newline at end of file diff --git a/docs/workflow.md b/docs/workflow.md index 260ebeb37e..32b69ad46d 100644 --- a/docs/workflow.md +++ b/docs/workflow.md @@ -150,25 +150,8 @@ See {ref}`process-multiple-outputs` for more details. The `emit` option can also be used to name a `stdout` output: -```groovy -process sayHello { - input: - val cheers - - output: - stdout emit: verbiage - - script: - """ - echo -n $cheers - """ -} - -workflow { - things = channel.of('Hello world!', 'Yo, dude!', 'Duck!') - sayHello(things) - sayHello.out.verbiage.view() -} +```{literalinclude} snippets/process-named-stdout.nf +:language: groovy ``` :::{note} @@ -334,26 +317,12 @@ The fully qualified process name can be used as a {ref}`process selector _.suffixed.view() }` to access the `suffixed` output of process `foo`. + ### And `&` The `&` *and* operator can be used to feed multiple processes with the same channel(s). For example: From d8efa4e63ede7fe9a0de9a8381788abd7fcdf72f Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Thu, 16 Nov 2023 14:29:18 -0600 Subject: [PATCH 4/5] Compose multi-channels with `and` operator Signed-off-by: Ben Sherman --- docs/snippets/and-channel.nf | 22 ++++++++++ docs/snippets/and-channel.out | 1 + docs/snippets/and-process.nf | 29 +++++++++++++ docs/snippets/and-process.out | 2 + docs/snippets/pipe-with-closure.nf | 2 +- docs/snippets/pipe.nf | 2 +- docs/workflow.md | 43 ++++++------------- .../nextflow/extension/ChannelEx.groovy | 32 ++++++++++++++ 8 files changed, 101 insertions(+), 32 deletions(-) create mode 100644 docs/snippets/and-channel.nf create mode 100644 docs/snippets/and-channel.out create mode 100644 docs/snippets/and-process.nf create mode 100644 docs/snippets/and-process.out diff --git a/docs/snippets/and-channel.nf b/docs/snippets/and-channel.nf new file mode 100644 index 0000000000..167a04b7ff --- /dev/null +++ b/docs/snippets/and-channel.nf @@ -0,0 +1,22 @@ +process foo { + input: + val begin + val middle + val end + + output: + val result + + exec: + result = "$begin $middle $end" +} + +workflow { + ch_begin = channel.of('Hello') + ch_middle = channel.of('world') + ch_end = channel.of('!!!') + + (ch_begin & ch_middle & ch_end) + | foo + | view +} \ No newline at end of file diff --git a/docs/snippets/and-channel.out b/docs/snippets/and-channel.out new file mode 100644 index 0000000000..b528ced883 --- /dev/null +++ b/docs/snippets/and-channel.out @@ -0,0 +1 @@ +Hello world !!! \ No newline at end of file diff --git a/docs/snippets/and-process.nf b/docs/snippets/and-process.nf new file mode 100644 index 0000000000..0da14dd636 --- /dev/null +++ b/docs/snippets/and-process.nf @@ -0,0 +1,29 @@ +process foo { + input: + val message + + output: + val result + + exec: + result = "$message world" +} + +process bar { + input: + val message + + output: + val result + + exec: + result = message.toUpperCase() +} + +workflow { + channel.of('Hello') + | map { it.reverse() } + | (foo & bar) + | mix + | view +} \ No newline at end of file diff --git a/docs/snippets/and-process.out b/docs/snippets/and-process.out new file mode 100644 index 0000000000..060b22de16 --- /dev/null +++ b/docs/snippets/and-process.out @@ -0,0 +1,2 @@ +olleH world +OLLEH \ No newline at end of file diff --git a/docs/snippets/pipe-with-closure.nf b/docs/snippets/pipe-with-closure.nf index d5381965bf..d6d880f9d6 100644 --- a/docs/snippets/pipe-with-closure.nf +++ b/docs/snippets/pipe-with-closure.nf @@ -12,7 +12,7 @@ process foo { workflow { suffix = ' world!' - channel.from('Hello','Hola','Ciao') + channel.of('Hello','Hola','Ciao') | map { it.toUpperCase() } | { _ -> foo(_, suffix) } | view diff --git a/docs/snippets/pipe.nf b/docs/snippets/pipe.nf index 4fac307250..0d45bd3845 100644 --- a/docs/snippets/pipe.nf +++ b/docs/snippets/pipe.nf @@ -10,5 +10,5 @@ process foo { } workflow { - channel.from('Hello','Hola','Ciao') | foo | map { it.toUpperCase() } | view + channel.of('Hello','Hola','Ciao') | foo | map { it.toUpperCase() } | view } \ No newline at end of file diff --git a/docs/workflow.md b/docs/workflow.md index 32b69ad46d..da2e82a406 100644 --- a/docs/workflow.md +++ b/docs/workflow.md @@ -226,7 +226,7 @@ Inputs can be specified like arguments when invoking the workflow: ```groovy workflow { - my_pipeline( channel.from('/some/data') ) + my_pipeline( channel.of('/some/data') ) } ``` @@ -327,7 +327,7 @@ Statements can also be split across multiple lines for better readability: ```groovy workflow { - channel.from('Hello','Hola','Ciao') + channel.of('Hello','Hola','Ciao') | foo | map { it.toUpperCase() } | view @@ -337,7 +337,7 @@ workflow { :::{versionadded} 23.12.0-edge ::: -When using the pipe operator, the right operand can also be a closure that receives the output of the left operand and returns the result of a process, workflow, or operator invocation. This form is a useful way to define a custom mapping between the left-hand outputs and right-hand inputs, including the use of additional input channels aside from the left-hand outputs. For example: +When using the pipe operator, the right operand can also be a closure that receives the output of the left operand and returns the result of a process, workflow, or operator invocation, or any expression that produces a channel or multi-channel. This form is a useful way to define a custom mapping between the left-hand outputs and right-hand inputs, including the use of additional input channels aside from the left-hand outputs. For example: ```{literalinclude} snippets/pipe-with-closure.nf :language: groovy @@ -349,36 +349,19 @@ When the left operand is a process, the closure argument is equivalent to the `. The `&` *and* operator can be used to feed multiple processes with the same channel(s). For example: -```groovy -process foo { - input: - val data - - output: - val result - - exec: - result = "$data world" -} +```{literalinclude} snippets/and-process.nf +:language: groovy +``` -process bar { - input: - val data +In the above snippet, the initial channel is piped to the {ref}`operator-map` operator, which reverses the string value. Then, the result is passed to the processes `foo` and `bar`, which are executed in parallel. Each process outputs a channel, and the two channels are combined using the {ref}`operator-mix` operator. Finally, the result is printed using the {ref}`operator-view` operator. - output: - val result +:::{versionadded} 23.12.0-edge +::: - exec: - result = data.toUpperCase() -} +The and operator can also be used to compose channels into multi-channels. For example: -workflow { - channel.from('Hello') - | map { it.reverse() } - | (foo & bar) - | mix - | view -} +```{literalinclude} snippets/and-channel.nf +:language: groovy ``` -In the above snippet, the initial channel is piped to the {ref}`operator-map` operator, which reverses the string value. Then, the result is passed to the processes `foo` and `bar`, which are executed in parallel. Each process outputs a channel, and the two channels are combined using the {ref}`operator-mix` operator. Finally, the result is printed using the {ref}`operator-view` operator. +Note that when a multi-channel is applied to an operator, the first channel is provided as the source and the other channels are applied as the arguments. diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy index c92e4e2af3..2da231032a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy @@ -237,6 +237,38 @@ class ChannelEx { throw new ScriptRuntimeException("Cannot pipe ${fmtType(out)} with ${fmtType(right)}") } + /** + * Compose a channel with another channel into a multi-channel. + * + * @param left + * @param right + */ + static ChannelOut and(DataflowWriteChannel left, DataflowWriteChannel right) { + new ChannelOut(List.of(left, right)) + } + + static ChannelOut and(ChannelOut left, DataflowWriteChannel right) { + def elements = ChannelOut.spread(left) + elements << right + + new ChannelOut(elements) + } + + static ChannelOut and(ChannelOut left, ChannelOut right) { + def elements = ChannelOut.spread(left) + elements.addAll(ChannelOut.spread(right)) + + new ChannelOut(elements) + } + + /** + * Compose a process or workflow with another process or workflow + * such that they receive the same input channels and their outputs + * are concatenated. + * + * @param left + * @param right + */ static CompositeDef and(ChainableDef left, ChainableDef right) { checkContext('and', left) checkContext('and', right) From 61f539f2ba92b3c3456a329e0df63bf47f7bebc9 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Thu, 16 Nov 2023 14:43:20 -0600 Subject: [PATCH 5/5] Add `then` operator (equivalent to pipe with closure) Signed-off-by: Ben Sherman --- docs/workflow.md | 9 +++++ .../nextflow/extension/ChannelEx.groovy | 36 +++++++++++++------ 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/docs/workflow.md b/docs/workflow.md index da2e82a406..3a9a3be96e 100644 --- a/docs/workflow.md +++ b/docs/workflow.md @@ -345,6 +345,15 @@ When using the pipe operator, the right operand can also be a closure that recei When the left operand is a process, the closure argument is equivalent to the `.out` of that process, and the output channels can be accessed by index or by name as described in [Process invocation](#process-invocation). For example, the `view` operation in the above example can be rewritten as `{ _ -> _.suffixed.view() }` to access the `suffixed` output of process `foo`. +You can also achieve the same behavior with the `then` operator: + +```groovy + channel.of('Hello','Hola','Ciao') + .map { it.toUpperCase() } + .then { _ -> foo(_, suffix) } + .view() +``` + ### And `&` The `&` *and* operator can be used to feed multiple processes with the same channel(s). For example: diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy index 2da231032a..75625d4c56 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy @@ -71,6 +71,26 @@ class ChannelEx { } } + /** + * Apply a channel as the first argument to a closure and return the result. + * + * @param source + * @param closure + */ + static Object then(final DataflowWriteChannel source, Closure closure) { + def out = closure.call(source) + if( out instanceof DataflowWriteChannel || out instanceof ChannelOut ) + return out + throw new ScriptRuntimeException("Pipeline closure did not return a channel or multi-channel") + } + + static Object then(final ChannelOut source, Closure closure) { + def out = closure.call(source) + if( out instanceof DataflowWriteChannel || out instanceof ChannelOut ) + return out + throw new ScriptRuntimeException("Pipeline closure did not return a channel or multi-channel") + } + /** * Creates a channel emitting the entries in the collection to which is applied * @@ -135,10 +155,7 @@ class ChannelEx { * @param right */ static Object or(DataflowWriteChannel left, Closure right) { - def out = right.call(left) - if( out instanceof DataflowWriteChannel || out instanceof ChannelOut ) - return out - throw new ScriptRuntimeException("Closure component did not return a channel") + then(left, right) } /** @@ -171,10 +188,7 @@ class ChannelEx { * @param right */ static Object or(ChannelOut left, Closure right) { - def out = right.call(left) - if( out instanceof DataflowWriteChannel || out instanceof ChannelOut ) - return out - throw new ScriptRuntimeException("Closure component did not return a channel") + then(left, right) } /** @@ -229,12 +243,12 @@ class ChannelEx { def out = left.invoke_a(InvokerHelper.EMPTY_ARGS) if( out instanceof DataflowWriteChannel ) - return or((DataflowWriteChannel)out, right) + return then((DataflowWriteChannel)out, right) if( out instanceof ChannelOut ) - return or((ChannelOut)out, right) + return then((ChannelOut)out, right) - throw new ScriptRuntimeException("Cannot pipe ${fmtType(out)} with ${fmtType(right)}") + throw new ScriptRuntimeException("Pipeline closure did not return a channel or multi-channel") } /**