diff --git a/docs/snippets/and-channel.nf b/docs/snippets/and-channel.nf new file mode 100644 index 0000000000..167a04b7ff --- /dev/null +++ b/docs/snippets/and-channel.nf @@ -0,0 +1,22 @@ +process foo { + input: + val begin + val middle + val end + + output: + val result + + exec: + result = "$begin $middle $end" +} + +workflow { + ch_begin = channel.of('Hello') + ch_middle = channel.of('world') + ch_end = channel.of('!!!') + + (ch_begin & ch_middle & ch_end) + | foo + | view +} \ No newline at end of file diff --git a/docs/snippets/and-channel.out b/docs/snippets/and-channel.out new file mode 100644 index 0000000000..b528ced883 --- /dev/null +++ b/docs/snippets/and-channel.out @@ -0,0 +1 @@ +Hello world !!! \ No newline at end of file diff --git a/docs/snippets/and-process.nf b/docs/snippets/and-process.nf new file mode 100644 index 0000000000..0da14dd636 --- /dev/null +++ b/docs/snippets/and-process.nf @@ -0,0 +1,29 @@ +process foo { + input: + val message + + output: + val result + + exec: + result = "$message world" +} + +process bar { + input: + val message + + output: + val result + + exec: + result = message.toUpperCase() +} + +workflow { + channel.of('Hello') + | map { it.reverse() } + | (foo & bar) + | mix + | view +} \ No newline at end of file diff --git a/docs/snippets/and-process.out b/docs/snippets/and-process.out new file mode 100644 index 0000000000..060b22de16 --- /dev/null +++ b/docs/snippets/and-process.out @@ -0,0 +1,2 @@ +olleH world +OLLEH \ No newline at end of file diff --git a/docs/snippets/pipe-with-closure.nf b/docs/snippets/pipe-with-closure.nf new file mode 100644 index 0000000000..d6d880f9d6 --- /dev/null +++ b/docs/snippets/pipe-with-closure.nf @@ -0,0 +1,19 @@ +process foo { + input: + val message + val suffix + + output: + val result, emit: suffixed + + exec: + result = "${message}${suffix}" +} + +workflow { + suffix = ' world!' + channel.of('Hello','Hola','Ciao') + | map { it.toUpperCase() } + | { _ -> foo(_, suffix) } + | view +} \ No newline at end of file diff --git a/docs/snippets/pipe-with-closure.out b/docs/snippets/pipe-with-closure.out new file mode 100644 index 0000000000..5220cf2707 --- /dev/null +++ b/docs/snippets/pipe-with-closure.out @@ -0,0 +1,3 @@ +HELLO world! +HOLA world! +CIAO world! \ No newline at end of file diff --git a/docs/snippets/pipe.nf b/docs/snippets/pipe.nf new file mode 100644 index 0000000000..0d45bd3845 --- /dev/null +++ b/docs/snippets/pipe.nf @@ -0,0 +1,14 @@ +process foo { + input: + val message + + output: + val result + + exec: + result = "$message world" +} + +workflow { + channel.of('Hello','Hola','Ciao') | foo | map { it.toUpperCase() } | view +} \ No newline at end of file diff --git a/docs/snippets/pipe.out b/docs/snippets/pipe.out new file mode 100644 index 0000000000..fee117ac47 --- /dev/null +++ b/docs/snippets/pipe.out @@ -0,0 +1,3 @@ +HELLO WORLD +HOLA WORLD +CIAO WORLD \ No newline at end of file diff --git a/docs/snippets/process-named-stdout.nf b/docs/snippets/process-named-stdout.nf new file mode 100644 index 0000000000..2cf99bcf72 --- /dev/null +++ b/docs/snippets/process-named-stdout.nf @@ -0,0 +1,18 @@ +process sayHello { + input: + val cheers + + output: + stdout emit: verbiage + + script: + """ + echo -n $cheers + """ +} + +workflow { + things = channel.of('Hello world!', 'Yo, dude!', 'Duck!') + sayHello(things) + sayHello.out.verbiage.view() +} \ No newline at end of file diff --git a/docs/snippets/process-named-stdout.out b/docs/snippets/process-named-stdout.out new file mode 100644 index 0000000000..ffd7b891ec --- /dev/null +++ b/docs/snippets/process-named-stdout.out @@ -0,0 +1,3 @@ +Hello world! +Yo, dude! +Duck! \ No newline at end of file diff --git a/docs/workflow.md b/docs/workflow.md index 260ebeb37e..3a9a3be96e 100644 --- a/docs/workflow.md +++ b/docs/workflow.md @@ -150,25 +150,8 @@ See {ref}`process-multiple-outputs` for more details. The `emit` option can also be used to name a `stdout` output: -```groovy -process sayHello { - input: - val cheers - - output: - stdout emit: verbiage - - script: - """ - echo -n $cheers - """ -} - -workflow { - things = channel.of('Hello world!', 'Yo, dude!', 'Duck!') - sayHello(things) - sayHello.out.verbiage.view() -} +```{literalinclude} snippets/process-named-stdout.nf +:language: groovy ``` :::{note} @@ -243,7 +226,7 @@ Inputs can be specified like arguments when invoking the workflow: ```groovy workflow { - my_pipeline( channel.from('/some/data') ) + my_pipeline( channel.of('/some/data') ) } ``` @@ -334,72 +317,60 @@ The fully qualified process name can be used as a {ref}`process selector _.suffixed.view() }` to access the `suffixed` output of process `foo`. + +You can also achieve the same behavior with the `then` operator: ```groovy -process foo { - input: - val data + channel.of('Hello','Hola','Ciao') + .map { it.toUpperCase() } + .then { _ -> foo(_, suffix) } + .view() +``` - output: - val result +### And `&` - exec: - result = "$data world" -} +The `&` *and* operator can be used to feed multiple processes with the same channel(s). For example: -process bar { - input: - val data +```{literalinclude} snippets/and-process.nf +:language: groovy +``` - output: - val result +In the above snippet, the initial channel is piped to the {ref}`operator-map` operator, which reverses the string value. Then, the result is passed to the processes `foo` and `bar`, which are executed in parallel. Each process outputs a channel, and the two channels are combined using the {ref}`operator-mix` operator. Finally, the result is printed using the {ref}`operator-view` operator. - exec: - result = data.toUpperCase() -} +:::{versionadded} 23.12.0-edge +::: -workflow { - channel.from('Hello') - | map { it.reverse() } - | (foo & bar) - | mix - | view -} +The and operator can also be used to compose channels into multi-channels. For example: + +```{literalinclude} snippets/and-channel.nf +:language: groovy ``` -In the above snippet, the initial channel is piped to the {ref}`operator-map` operator, which reverses the string value. Then, the result is passed to the processes `foo` and `bar`, which are executed in parallel. Each process outputs a channel, and the two channels are combined using the {ref}`operator-mix` operator. Finally, the result is printed using the {ref}`operator-view` operator. +Note that when a multi-channel is applied to an operator, the first channel is provided as the source and the other channels are applied as the arguments. diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy index 2ebc20ae87..75625d4c56 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/ChannelEx.groovy @@ -71,6 +71,26 @@ class ChannelEx { } } + /** + * Apply a channel as the first argument to a closure and return the result. + * + * @param source + * @param closure + */ + static Object then(final DataflowWriteChannel source, Closure closure) { + def out = closure.call(source) + if( out instanceof DataflowWriteChannel || out instanceof ChannelOut ) + return out + throw new ScriptRuntimeException("Pipeline closure did not return a channel or multi-channel") + } + + static Object then(final ChannelOut source, Closure closure) { + def out = closure.call(source) + if( out instanceof DataflowWriteChannel || out instanceof ChannelOut ) + return out + throw new ScriptRuntimeException("Pipeline closure did not return a channel or multi-channel") + } + /** * Creates a channel emitting the entries in the collection to which is applied * @@ -106,11 +126,10 @@ class ChannelEx { } /** - * Implements pipe operation between a channel and a process or a sub-workflow + * Pipe a channel INTO a process or workflow. * - * @param left A dataflow channel instance - * @param right A {@link ChainableDef} object eg. a nextflow process - * @return The channel resulting the pipe operation + * @param left + * @param right */ static Object or(DataflowWriteChannel left, ChainableDef right) { checkContext('or', right) @@ -118,11 +137,10 @@ class ChannelEx { } /** - * Implements pipe operation between a channel WITH a operator + * Pipe a channel INTO an operator. * - * @param left A {@code DataflowWriteChannel} channel as left operand - * @param right A {@code OpCall} object representing a operator call as right operand - * @return The resulting channel object + * @param left + * @param right */ static Object or(DataflowWriteChannel left, OpCall right) { checkContext('or', right) @@ -130,11 +148,21 @@ class ChannelEx { } /** - * Implements pipe operation between a multi-channels WITH a process or a sub-workflow + * Pipe a channel INTO a closure that defines a custom + * invocation of a process, workflow, or operator. * - * @param left A {@code ChannelOut} multi-channel object as left operand - * @param right A {@code ChainableDef} object representing a process or sub-workflow call as right operand - * @return The resulting channel object + * @param left + * @param right + */ + static Object or(DataflowWriteChannel left, Closure right) { + then(left, right) + } + + /** + * Pipe a multi-channel INTO a process or workflow. + * + * @param left + * @param right */ static Object or(ChannelOut left, ChainableDef right) { checkContext('or', right) @@ -142,11 +170,10 @@ class ChannelEx { } /** - * Implements pipe operation between a multi-channels WITH a operator + * Pipe a multi-channel INTO an operator. * - * @param left A {@code ChannelOut} multi-channel object as left operand - * @param right A {@code OpCall} object representing a operator call as right operand - * @return The resulting channel object + * @param left + * @param right */ static Object or(ChannelOut left, OpCall right) { checkContext('or', right) @@ -154,14 +181,26 @@ class ChannelEx { } /** - * Implements pipe operation between a process or sub-workflow WITH a operator + * Pipe a multi-channel INTO a closure that defines a custom + * invocation of a process, workflow, or operator. * - * @param left A {@code ChainableDef} object representing a process or sub-workflow call as left operand - * @param right A {@code OpCall} object representing a operator call as right operand - * @return The resulting channel object + * @param left + * @param right */ - static Object or(ChainableDef left, OpCall right) { + static Object or(ChannelOut left, Closure right) { + then(left, right) + } + + /** + * Pipe a process or workflow INTO another process or workflow. + * + * @param left + * @param right + */ + static Object or(ChainableDef left, ChainableDef right) { checkContext('or', left) + checkContext('or', right) + def out = left.invoke_a(InvokerHelper.EMPTY_ARGS) if( out instanceof DataflowWriteChannel ) @@ -174,16 +213,13 @@ class ChannelEx { } /** - * Implements pipe operation between a process or sub-workflow WITH another process or sub-workflow + * Pipe a process or workflow INTO an operator. * - * @param left A {@code ChainableDef} object representing a process or sub-workflow call as left operand - * @param right A {@code ChainableDef} object representing a process or sub-workflow call as right operand - * @return + * @param left + * @param right */ - static Object or(ChainableDef left, ChainableDef right) { + static Object or(ChainableDef left, OpCall right) { checkContext('or', left) - checkContext('or', right) - def out = left.invoke_a(InvokerHelper.EMPTY_ARGS) if( out instanceof DataflowWriteChannel ) @@ -195,6 +231,58 @@ class ChannelEx { throw new ScriptRuntimeException("Cannot pipe ${fmtType(out)} with ${fmtType(right)}") } + /** + * Pipe a process or workflow INTO a closure that defines a custom + * invocation of a process, workflow, or operator. + * + * @param left + * @param right + */ + static Object or(ChainableDef left, Closure right) { + checkContext('or', left) + def out = left.invoke_a(InvokerHelper.EMPTY_ARGS) + + if( out instanceof DataflowWriteChannel ) + return then((DataflowWriteChannel)out, right) + + if( out instanceof ChannelOut ) + return then((ChannelOut)out, right) + + throw new ScriptRuntimeException("Pipeline closure did not return a channel or multi-channel") + } + + /** + * Compose a channel with another channel into a multi-channel. + * + * @param left + * @param right + */ + static ChannelOut and(DataflowWriteChannel left, DataflowWriteChannel right) { + new ChannelOut(List.of(left, right)) + } + + static ChannelOut and(ChannelOut left, DataflowWriteChannel right) { + def elements = ChannelOut.spread(left) + elements << right + + new ChannelOut(elements) + } + + static ChannelOut and(ChannelOut left, ChannelOut right) { + def elements = ChannelOut.spread(left) + elements.addAll(ChannelOut.spread(right)) + + new ChannelOut(elements) + } + + /** + * Compose a process or workflow with another process or workflow + * such that they receive the same input channels and their outputs + * are concatenated. + * + * @param left + * @param right + */ static CompositeDef and(ChainableDef left, ChainableDef right) { checkContext('and', left) checkContext('and', right)