diff --git a/docs/channel.md b/docs/channel.md index ab58b8a1b5..157ae08534 100644 --- a/docs/channel.md +++ b/docs/channel.md @@ -19,7 +19,8 @@ In Nextflow there are two kinds of channels: *queue channels* and *value channel ### Queue channel -A *queue channel* is a non-blocking unidirectional FIFO queue which connects two processes, channel factories, or operators. +A *queue channel* is a non-blocking unidirectional FIFO queue connecting a *producer* process (i.e. outputting a value) +to a consumer process, or an operators. A queue channel can be created by factory methods ([of](#of), [fromPath](#frompath), etc), operators ({ref}`operator-map`, {ref}`operator-flatmap`, etc), and processes (see {ref}`Process outputs `). @@ -27,9 +28,12 @@ A queue channel can be created by factory methods ([of](#of), [fromPath](#frompa ### Value channel -A *value channel* contains a single value and can be consumed any number of times by a process or operator. +A *value channel* can be bound (i.e. assigned) with one and only one value, and can be consumed any number of times by +a process or an operator. -A value channel can be created with the [value](#value) factory method or by any operator that produces a single value ({ref}`operator-first`, {ref}`operator-collect`, {ref}`operator-reduce`, etc). Additionally, a process will emit value channels if it is invoked with all value channels, including simple values which are implicitly wrapped in a value channel. +A value channel can be created with the [value](#value) factory method or by any operator that produces a single value +({ref}`operator-first`, {ref}`operator-collect`, {ref}`operator-reduce`, etc). Additionally, a process will emit value +channels if it is invoked with all value channels, including simple values which are implicitly wrapped in a value channel. For example: @@ -52,7 +56,8 @@ workflow { } ``` -In the above example, since the `foo` process is invoked with a simple value instead of a channel, the input is implicitly wrapped in a value channel, and the output is also emitted as a value channel. +In the above example, since the `foo` process is invoked with a simple value instead of a channel, the input is implicitly +wrapped in a value channel, and the output is also emitted as a value channel. See also: {ref}`process-multiple-input-channels`. @@ -63,7 +68,8 @@ See also: {ref}`process-multiple-input-channels`. Channels may be created explicitly using the following channel factory methods. :::{versionadded} 20.07.0 -`channel` was introduced as an alias of `Channel`, allowing factory methods to be specified as `channel.of()` or `Channel.of()`, and so on. +`channel` was introduced as an alias of `Channel`, allowing factory methods to be specified as `channel.of()` or +`Channel.of()`, and so on. ::: (channel-empty)= @@ -429,6 +435,108 @@ Y See also: [channel.fromList](#fromlist) factory method. +(channel-topic)= + +### topic + +:::{versionadded} 23.11.0-edge +::: + +:::{note} +This feature requires the `nextflow.preview.topic` feature flag to be enabled. +::: + +A *topic* is a channel type introduced as of Nextflow 23.11.0-edge along with {ref}`channel-type-value` and +{ref}`channel-type-queue`. + +A *topic channel*, similarly to a *queue channel*, is non-blocking unidirectional FIFO queue, however it connects +multiple *producer* processes with multiple *consumer* processes or operators. + +:::{tip} +You can think about it as a channel that is shared across many different process using the same *topic name*. +::: + +A process output can be assigned to a topic using the `topic` option on an output, for example: + +```groovy +process foo { + output: + val('foo'), topic: my_topic +} + +process bar { + output: + val('bar'), topic: my_topic +} +``` + +The `channel.topic` method allows referencing the topic channel with the specified name, which can be used as a process +input or operator composition as any other Nextflow channel: + +```groovy +channel.topic('my-topic').view() +``` + +This approach is a convenient way to collect related items from many different sources without explicitly defining +the logic connecting many different queue channels altogether, commonly using the `mix` operator. + +:::{warning} +Any process that consumes a channel topic should not send any outputs to that topic, or else the pipeline will hang forever. +::: + +See also: {ref}`process-additional-options` for process outputs. + +(channel-topic)= + +### topic + +:::{versionadded} 23.11.0-edge +::: + +:::{note} +This feature requires the `nextflow.preview.topic` feature flag to be enabled. +::: + +A *topic* is a channel type introduced as of Nextflow 23.11.0-edge along with {ref}`channel-type-value` and +{ref}`channel-type-queue`. + +A *topic channel*, similarly to a *queue channel*, is non-blocking unidirectional FIFO queue, however it connects +multiple *producer* processes with multiple *consumer* processes or operators. + +:::{tip} +You can think about it as a channel that is shared across many different process using the same *topic name*. +::: + +A process output can be assigned to a topic using the `topic` option on an output, for example: + +```groovy +process foo { + output: + val('foo'), topic: my_topic +} + +process bar { + output: + val('bar'), topic: my_topic +} +``` + +The `channel.topic` method allows referencing the topic channel with the specified name, which can be used as a process +input or operator composition as any other Nextflow channel: + +```groovy +Channel.topic('my-topic').view() +``` + +This approach is a convenient way to collect related items from many different sources without explicitly defining +the logic connecting many different queue channels altogether, commonly using the `mix` operator. + +:::{warning} +Any process that consumes a channel topic should not send any outputs to that topic, or else the pipeline will hang forever. +::: + +See also: {ref}`process-additional-options` for process outputs. + (channel-value)= ### value diff --git a/docs/config.md b/docs/config.md index b22e2601f7..2848954d5f 100644 --- a/docs/config.md +++ b/docs/config.md @@ -1856,3 +1856,12 @@ Some features can be enabled using the `nextflow.enable` and `nextflow.preview` : *Experimental: may change in a future release.* : When `true`, enables process and workflow recursion. See [this GitHub discussion](https://github.com/nextflow-io/nextflow/discussions/2521) for more information. + +`nextflow.preview.topic` + +: :::{versionadded} 23.11.0-edge + ::: + +: *Experimental: may change in a future release.* + +: When `true`, enables {ref}`topic channels ` feature. diff --git a/docs/process.md b/docs/process.md index af245312cf..57062d19f1 100644 --- a/docs/process.md +++ b/docs/process.md @@ -1166,62 +1166,59 @@ process foo { ``` ::: -### Optional outputs +(process-additional-options)= -In most cases, a process is expected to produce an output for each output definition. However, there are situations where it is valid for a process to not generate output. In these cases, `optional: true` may be added to the output definition, which tells Nextflow not to fail the process if the declared output is not produced: +### Additional options -```groovy -output: - path("output.txt"), optional: true -``` +The following options are available for all process outputs: -In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is legitimately missing, the process does not fail. The output channel will only contain values for those processes that produce `output.txt`. +`emit: ` -(process-multiple-outputs)= +: Defines the name of the output channel, which can be used to access the channel by name from the process output: -### Multiple outputs + ```groovy + process FOO { + output: + path 'hello.txt', emit: hello + path 'bye.txt', emit: bye + + """ + echo "hello" > hello.txt + echo "bye" > bye.txt + """ + } -When a process declares multiple outputs, each output can be accessed by index. The following example prints the second process output (indexes start at zero): + workflow { + FOO() + FOO.out.hello.view() + } + ``` -```groovy -process FOO { - output: - path 'bye_file.txt' - path 'hi_file.txt' + See {ref}`workflow-process-invocation` for more details. - """ - echo "bye" > bye_file.txt - echo "hi" > hi_file.txt - """ -} +`optional: true | false` -workflow { - FOO() - FOO.out[1].view() -} -``` +: Normally, if a specified output is not produced by the task, the task will fail. Setting `optional: true` will cause the task to not fail, and instead emit nothing to the given output channel. -You can also use the `emit` option to assign a name to each output and access them by name: + ```groovy + output: + path("output.txt"), optional: true + ``` -```groovy -process FOO { - output: - path 'bye_file.txt', emit: bye_file - path 'hi_file.txt', emit: hi_file + In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is missing, the task will not fail. The output channel will only contain values for those tasks that produced `output.txt`. - """ - echo "bye" > bye_file.txt - echo "hi" > hi_file.txt - """ -} +: :::{note} + While this option can be used with any process output, it cannot be applied to individual elements of a [tuple](#output-type-tuple) output. The entire tuple must be optional or not optional. + ::: -workflow { - FOO() - FOO.out.hi_file.view() -} -``` +`topic: ` + +: :::{versionadded} 23.11.0-edge + ::: + +: *Experimental: may change in a future release.* -See {ref}`workflow-process-invocation` for more details. +: Defines the {ref}`channel topic ` to which the output will be sent. ## When diff --git a/docs/workflow.md b/docs/workflow.md index 260ebeb37e..6bc45f1015 100644 --- a/docs/workflow.md +++ b/docs/workflow.md @@ -104,7 +104,7 @@ workflow { } ``` -When a process defines multiple output channels, each output can be accessed using the array element operator (`out[0]`, `out[1]`, etc.) or using *named outputs* (see below). +When a process defines multiple output channels, each output can be accessed by index (`out[0]`, `out[1]`, etc.) or by name (see below). The process output(s) can also be accessed like the return value of a function: @@ -144,7 +144,7 @@ workflow { } ``` -See {ref}`process-multiple-outputs` for more details. +See {ref}`process outputs ` for more details. ### Process named stdout diff --git a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy index 822ce03537..84c0d2bf0e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Channel.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Channel.groovy @@ -121,6 +121,11 @@ class Channel { return CH.queue() } + static DataflowWriteChannel topic(String name) { + if( !NF.topicChannelEnabled ) throw new MissingMethodException('topic', Channel.class, InvokerHelper.EMPTY_ARGS) + return CH.topic(name) + } + /** * Create a empty channel i.e. only emits a STOP signal * diff --git a/modules/nextflow/src/main/groovy/nextflow/NF.groovy b/modules/nextflow/src/main/groovy/nextflow/NF.groovy index df8b592b31..3f13630088 100644 --- a/modules/nextflow/src/main/groovy/nextflow/NF.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/NF.groovy @@ -67,4 +67,8 @@ class NF { static boolean isRecurseEnabled() { NextflowMeta.instance.preview.recursion } + + static boolean isTopicChannelEnabled() { + NextflowMeta.instance.preview.topic + } } diff --git a/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy b/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy index b9b279fb87..bd350b945d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy @@ -43,6 +43,7 @@ class NextflowMeta { volatile float dsl boolean strict boolean recursion + boolean topic void setDsl( float num ) { if( num == 1 ) @@ -59,6 +60,12 @@ class NextflowMeta { log.warn "NEXTFLOW RECURSION IS A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES" this.recursion = recurse } + + void setTopic(Boolean value) { + if( topic ) + log.warn "CHANNEL TOPICS ARE A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES" + this.topic = value + } } static class Features implements Flags { diff --git a/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy index a278fdd8f6..0586019d7a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy @@ -915,15 +915,16 @@ class NextflowDSLImpl implements ASTTransformation { /** * Transform a map entry `emit: something` into `emit: 'something' + * and `topic: something` into `topic: 'something' * (ie. as a constant) in a map expression passed as argument to * a method call. This allow the syntax * * output: - * path 'foo', emit: bar + * path 'foo', emit: bar, topic: baz * * @param call */ - protected void fixOutEmitOption(MethodCallExpression call) { + protected void fixOutEmitAndTopicOptions(MethodCallExpression call) { List args = isTupleX(call.arguments)?.expressions if( !args ) return if( args.size()<2 && (args.size()!=1 || call.methodAsString!='_out_stdout')) return @@ -936,6 +937,9 @@ class NextflowDSLImpl implements ASTTransformation { if( key?.text == 'emit' && val ) { map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text)) } + else if( key?.text == 'topic' && val ) { + map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text)) + } } } @@ -955,7 +959,7 @@ class NextflowDSLImpl implements ASTTransformation { // prefix the method name with the string '_out_' methodCall.setMethod( new ConstantExpression('_out_' + methodName) ) fixMethodCall(methodCall) - fixOutEmitOption(methodCall) + fixOutEmitAndTopicOptions(methodCall) } else if( methodName in ['into','mode'] ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy index ba8727808e..6069317f86 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy @@ -1,5 +1,7 @@ package nextflow.extension +import static nextflow.Channel.* + import groovy.transform.CompileStatic import groovy.transform.PackageScope import groovy.util.logging.Slf4j @@ -15,8 +17,6 @@ import nextflow.Channel import nextflow.Global import nextflow.NF import nextflow.Session -import static nextflow.Channel.STOP - /** * Helper class to handle channel internal api ops * @@ -30,7 +30,14 @@ class CH { return (Session) Global.session } - static private Map bridges = new HashMap<>(10) + static class Topic { + DataflowBroadcast broadcaster = new DataflowBroadcast() + List sources = new ArrayList<>(10) + } + + static final private Map allTopics = new HashMap<>(10) + + static final private Map bridges = new HashMap<>(10) static DataflowReadChannel getReadChannel(channel) { if (channel instanceof DataflowQueue) @@ -66,7 +73,15 @@ class CH { } static void broadcast() { - // connect all dataflow queue variables to associated broadcast channel + // connect all broadcast topics, note this must be before the following + // "bridging" step because it can modify the final network topology + connectTopics() + // bridge together all broadcast channels + bridgeChannels() + } + + static private void bridgeChannels() { + // connect all dataflow queue variables to associated broadcast channel for( DataflowQueue queue : bridges.keySet() ) { log.trace "Bridging dataflow queue=$queue" def broadcast = bridges.get(queue) @@ -74,6 +89,25 @@ class CH { } } + static private void connectTopics() { + for( Topic topic : allTopics.values() ) { + if( topic.sources ) { + // get the list of source channels for this topic + final ch = new ArrayList(topic.sources) + // the mix operator requires at least two sources, add an empty channel if needed + if( ch.size()==1 ) + ch.add(empty()) + // map write channels to read channels + final sources = ch.collect(it -> getReadChannel(it)) + // mix all of them + new MixOp(sources).withTarget(topic.broadcaster).apply() + } + else { + topic.broadcaster.bind(STOP) + } + } + } + static void init() { bridges.clear() } @PackageScope @@ -102,6 +136,31 @@ class CH { return new DataflowQueue() } + static DataflowBroadcast topic(String name) { + synchronized (allTopics) { + def topic = allTopics[name] + if( topic!=null ) + return topic.broadcaster + // create a new topic + topic = new Topic() + allTopics[name] = topic + return topic.broadcaster + } + } + + static DataflowWriteChannel createTopicSource(String name) { + synchronized (allTopics) { + def topic = allTopics.get(name) + if( topic==null ) { + topic = new Topic() + allTopics.put(name, topic) + } + final result = CH.create() + topic.sources.add(result) + return result + } + } + static boolean isChannel(obj) { obj instanceof DataflowReadChannel || obj instanceof DataflowWriteChannel } diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy index f8e0aed304..62ba37d5f2 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy @@ -36,6 +36,7 @@ class MixOp { private DataflowReadChannel source private List others + private DataflowWriteChannel target MixOp(DataflowReadChannel source, DataflowReadChannel other) { this.source = source @@ -47,8 +48,21 @@ class MixOp { this.others = others.toList() } + MixOp(List channels) { + if( channels.size()<2 ) + throw new IllegalArgumentException("Mix operator requires at least 2 source channels") + this.source = channels.get(0) + this.others = channels.subList(1, channels.size()) + } + + MixOp withTarget(DataflowWriteChannel target) { + this.target = target + return this + } + DataflowWriteChannel apply() { - def target = CH.create() + if( target == null ) + target = CH.create() def count = new AtomicInteger( others.size()+1 ) def handlers = [ onNext: { target << it }, diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy index 7cfc6309cc..caac95d11d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy @@ -194,8 +194,14 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef { throw new ScriptRuntimeException("Process `$processName` inputs and outputs do not have the same cardinality - Feedback loop is not supported" ) for(int i=0; i + then: + outs[0].name == 'x' + outs[0].channelTopicName == 'ch0' + and: + outs[1].name == 'FOO' + outs[1].channelTopicName == 'ch1' + and: + outs[2] instanceof StdOutParam // <-- note: declared as `path`, turned into a `stdout` + outs[2].name == '-' + outs[2].channelTopicName == 'ch2' + and: + outs[3] instanceof StdOutParam + outs[3].name == '-' + outs[3].channelTopicName == 'ch3' + } + + def 'should define out tuple with topic'() { + + setup: + def text = ''' + process hola { + output: + tuple val(x), val(y), topic: ch1 + tuple path('foo'), topic: ch2 + tuple stdout,env(bar), topic: ch3 + + /return/ + } + + workflow { hola() } + ''' + + def binding = [:] + def process = parseAndReturnProcess(text, binding) + + when: + def outs = process.config.getOutputs() as List + + then: + outs[0].name == 'tupleoutparam<0>' + outs[0].channelTopicName == 'ch1' + outs[0].inner[0] instanceof ValueOutParam + outs[0].inner[0].name == 'x' + outs[0].inner[1] instanceof ValueOutParam + outs[0].inner[1].name == 'y' + and: + outs[1].name == 'tupleoutparam<1>' + outs[1].channelTopicName == 'ch2' + outs[1].inner[0] instanceof FileOutParam + and: + outs[2].name == 'tupleoutparam<2>' + outs[2].channelTopicName == 'ch3' + outs[2].inner[0] instanceof StdOutParam + outs[2].inner[0].name == '-' + outs[2].inner[1] instanceof EnvOutParam + outs[2].inner[1].name == 'bar' + + } } diff --git a/tests/checks/topic-channel.nf/.checks b/tests/checks/topic-channel.nf/.checks new file mode 100644 index 0000000000..425cb15ae2 --- /dev/null +++ b/tests/checks/topic-channel.nf/.checks @@ -0,0 +1,15 @@ +# +# initial run +# +echo Initial run +$NXF_RUN + +cmp versions.txt .expected || false + +# +# Resumed run +# +echo Resumed run +$NXF_RUN -resume + +cmp versions.txt .expected || false diff --git a/tests/checks/topic-channel.nf/.expected b/tests/checks/topic-channel.nf/.expected new file mode 100644 index 0000000000..1bb4d6b958 --- /dev/null +++ b/tests/checks/topic-channel.nf/.expected @@ -0,0 +1,2 @@ +bar: 0.9.0 +foo: 0.1.0 diff --git a/tests/topic-channel.nf b/tests/topic-channel.nf new file mode 100644 index 0000000000..c5eb17bc97 --- /dev/null +++ b/tests/topic-channel.nf @@ -0,0 +1,37 @@ + +nextflow.preview.topic = true + +process foo { + input: + val(index) + + output: + stdout emit: versions, topic: versions + + script: + """ + echo 'foo: 0.1.0' + """ +} + +process bar { + input: + val(index) + + output: + stdout emit: versions, topic: versions + + script: + """ + echo 'bar: 0.9.0' + """ +} + +workflow { + Channel.of( 1..3 ) | foo + Channel.of( 1..3 ) | bar + + Channel.topic('versions') + | unique + | collectFile(name: 'versions.txt', sort: true, storeDir: '.') +}