Skip to content

Commit

Permalink
Add "topic" channel (#4459)
Browse files Browse the repository at this point in the history
This commit adds experimental support for topic channel. 

A topic channel allows multiple processes to write over the same 
channel identified by a shared topic name.


Signed-off-by: Paolo Di Tommaso <[email protected]>
Signed-off-by: Ben Sherman <[email protected]>
Co-authored-by: Ben Sherman <[email protected]>
  • Loading branch information
pditommaso and bentsherman authored Nov 24, 2023
1 parent b641d67 commit 921313d
Show file tree
Hide file tree
Showing 17 changed files with 426 additions and 60 deletions.
118 changes: 113 additions & 5 deletions docs/channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ In Nextflow there are two kinds of channels: *queue channels* and *value channel

### Queue channel

A *queue channel* is a non-blocking unidirectional FIFO queue which connects two processes, channel factories, or operators.
A *queue channel* is a non-blocking unidirectional FIFO queue connecting a *producer* process (i.e. outputting a value)
to a consumer process, or an operators.

A queue channel can be created by factory methods ([of](#of), [fromPath](#frompath), etc), operators ({ref}`operator-map`, {ref}`operator-flatmap`, etc), and processes (see {ref}`Process outputs <process-output>`).

(channel-type-value)=

### Value channel

A *value channel* contains a single value and can be consumed any number of times by a process or operator.
A *value channel* can be bound (i.e. assigned) with one and only one value, and can be consumed any number of times by
a process or an operator.

A value channel can be created with the [value](#value) factory method or by any operator that produces a single value ({ref}`operator-first`, {ref}`operator-collect`, {ref}`operator-reduce`, etc). Additionally, a process will emit value channels if it is invoked with all value channels, including simple values which are implicitly wrapped in a value channel.
A value channel can be created with the [value](#value) factory method or by any operator that produces a single value
({ref}`operator-first`, {ref}`operator-collect`, {ref}`operator-reduce`, etc). Additionally, a process will emit value
channels if it is invoked with all value channels, including simple values which are implicitly wrapped in a value channel.

For example:

Expand All @@ -52,7 +56,8 @@ workflow {
}
```

In the above example, since the `foo` process is invoked with a simple value instead of a channel, the input is implicitly wrapped in a value channel, and the output is also emitted as a value channel.
In the above example, since the `foo` process is invoked with a simple value instead of a channel, the input is implicitly
wrapped in a value channel, and the output is also emitted as a value channel.

See also: {ref}`process-multiple-input-channels`.

Expand All @@ -63,7 +68,8 @@ See also: {ref}`process-multiple-input-channels`.
Channels may be created explicitly using the following channel factory methods.

:::{versionadded} 20.07.0
`channel` was introduced as an alias of `Channel`, allowing factory methods to be specified as `channel.of()` or `Channel.of()`, and so on.
`channel` was introduced as an alias of `Channel`, allowing factory methods to be specified as `channel.of()` or
`Channel.of()`, and so on.
:::

(channel-empty)=
Expand Down Expand Up @@ -429,6 +435,108 @@ Y
See also: [channel.fromList](#fromlist) factory method.
(channel-topic)=
### topic
:::{versionadded} 23.11.0-edge
:::
:::{note}
This feature requires the `nextflow.preview.topic` feature flag to be enabled.
:::
A *topic* is a channel type introduced as of Nextflow 23.11.0-edge along with {ref}`channel-type-value` and
{ref}`channel-type-queue`.
A *topic channel*, similarly to a *queue channel*, is non-blocking unidirectional FIFO queue, however it connects
multiple *producer* processes with multiple *consumer* processes or operators.
:::{tip}
You can think about it as a channel that is shared across many different process using the same *topic name*.
:::
A process output can be assigned to a topic using the `topic` option on an output, for example:
```groovy
process foo {
output:
val('foo'), topic: my_topic
}
process bar {
output:
val('bar'), topic: my_topic
}
```
The `channel.topic` method allows referencing the topic channel with the specified name, which can be used as a process
input or operator composition as any other Nextflow channel:
```groovy
channel.topic('my-topic').view()
```
This approach is a convenient way to collect related items from many different sources without explicitly defining
the logic connecting many different queue channels altogether, commonly using the `mix` operator.
:::{warning}
Any process that consumes a channel topic should not send any outputs to that topic, or else the pipeline will hang forever.
:::
See also: {ref}`process-additional-options` for process outputs.
(channel-topic)=
### topic
:::{versionadded} 23.11.0-edge
:::
:::{note}
This feature requires the `nextflow.preview.topic` feature flag to be enabled.
:::
A *topic* is a channel type introduced as of Nextflow 23.11.0-edge along with {ref}`channel-type-value` and
{ref}`channel-type-queue`.
A *topic channel*, similarly to a *queue channel*, is non-blocking unidirectional FIFO queue, however it connects
multiple *producer* processes with multiple *consumer* processes or operators.
:::{tip}
You can think about it as a channel that is shared across many different process using the same *topic name*.
:::
A process output can be assigned to a topic using the `topic` option on an output, for example:
```groovy
process foo {
output:
val('foo'), topic: my_topic
}
process bar {
output:
val('bar'), topic: my_topic
}
```
The `channel.topic` method allows referencing the topic channel with the specified name, which can be used as a process
input or operator composition as any other Nextflow channel:
```groovy
Channel.topic('my-topic').view()
```
This approach is a convenient way to collect related items from many different sources without explicitly defining
the logic connecting many different queue channels altogether, commonly using the `mix` operator.
:::{warning}
Any process that consumes a channel topic should not send any outputs to that topic, or else the pipeline will hang forever.
:::
See also: {ref}`process-additional-options` for process outputs.
(channel-value)=
### value
Expand Down
9 changes: 9 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1856,3 +1856,12 @@ Some features can be enabled using the `nextflow.enable` and `nextflow.preview`
: *Experimental: may change in a future release.*

: When `true`, enables process and workflow recursion. See [this GitHub discussion](https://github.com/nextflow-io/nextflow/discussions/2521) for more information.

`nextflow.preview.topic`

: :::{versionadded} 23.11.0-edge
:::

: *Experimental: may change in a future release.*

: When `true`, enables {ref}`topic channels <channel-topic>` feature.
81 changes: 39 additions & 42 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1166,62 +1166,59 @@ process foo {
```
:::

### Optional outputs
(process-additional-options)=

In most cases, a process is expected to produce an output for each output definition. However, there are situations where it is valid for a process to not generate output. In these cases, `optional: true` may be added to the output definition, which tells Nextflow not to fail the process if the declared output is not produced:
### Additional options

```groovy
output:
path("output.txt"), optional: true
```
The following options are available for all process outputs:

In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is legitimately missing, the process does not fail. The output channel will only contain values for those processes that produce `output.txt`.
`emit: <name>`

(process-multiple-outputs)=
: Defines the name of the output channel, which can be used to access the channel by name from the process output:

### Multiple outputs
```groovy
process FOO {
output:
path 'hello.txt', emit: hello
path 'bye.txt', emit: bye
"""
echo "hello" > hello.txt
echo "bye" > bye.txt
"""
}
When a process declares multiple outputs, each output can be accessed by index. The following example prints the second process output (indexes start at zero):
workflow {
FOO()
FOO.out.hello.view()
}
```

```groovy
process FOO {
output:
path 'bye_file.txt'
path 'hi_file.txt'
See {ref}`workflow-process-invocation` for more details.

"""
echo "bye" > bye_file.txt
echo "hi" > hi_file.txt
"""
}
`optional: true | false`

workflow {
FOO()
FOO.out[1].view()
}
```
: Normally, if a specified output is not produced by the task, the task will fail. Setting `optional: true` will cause the task to not fail, and instead emit nothing to the given output channel.

You can also use the `emit` option to assign a name to each output and access them by name:
```groovy
output:
path("output.txt"), optional: true
```

```groovy
process FOO {
output:
path 'bye_file.txt', emit: bye_file
path 'hi_file.txt', emit: hi_file
In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is missing, the task will not fail. The output channel will only contain values for those tasks that produced `output.txt`.

"""
echo "bye" > bye_file.txt
echo "hi" > hi_file.txt
"""
}
: :::{note}
While this option can be used with any process output, it cannot be applied to individual elements of a [tuple](#output-type-tuple) output. The entire tuple must be optional or not optional.
:::

workflow {
FOO()
FOO.out.hi_file.view()
}
```
`topic: <name>`

: :::{versionadded} 23.11.0-edge
:::

: *Experimental: may change in a future release.*

See {ref}`workflow-process-invocation` for more details.
: Defines the {ref}`channel topic <channel-topic>` to which the output will be sent.

## When

Expand Down
4 changes: 2 additions & 2 deletions docs/workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ workflow {
}
```

When a process defines multiple output channels, each output can be accessed using the array element operator (`out[0]`, `out[1]`, etc.) or using *named outputs* (see below).
When a process defines multiple output channels, each output can be accessed by index (`out[0]`, `out[1]`, etc.) or by name (see below).

The process output(s) can also be accessed like the return value of a function:

Expand Down Expand Up @@ -144,7 +144,7 @@ workflow {
}
```

See {ref}`process-multiple-outputs` for more details.
See {ref}`process outputs <process-additional-options>` for more details.

### Process named stdout

Expand Down
5 changes: 5 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Channel.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ class Channel {
return CH.queue()
}

static DataflowWriteChannel topic(String name) {
if( !NF.topicChannelEnabled ) throw new MissingMethodException('topic', Channel.class, InvokerHelper.EMPTY_ARGS)
return CH.topic(name)
}

/**
* Create a empty channel i.e. only emits a STOP signal
*
Expand Down
4 changes: 4 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/NF.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,8 @@ class NF {
static boolean isRecurseEnabled() {
NextflowMeta.instance.preview.recursion
}

static boolean isTopicChannelEnabled() {
NextflowMeta.instance.preview.topic
}
}
7 changes: 7 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class NextflowMeta {
volatile float dsl
boolean strict
boolean recursion
boolean topic

void setDsl( float num ) {
if( num == 1 )
Expand All @@ -59,6 +60,12 @@ class NextflowMeta {
log.warn "NEXTFLOW RECURSION IS A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES"
this.recursion = recurse
}

void setTopic(Boolean value) {
if( topic )
log.warn "CHANNEL TOPICS ARE A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES"
this.topic = value
}
}

static class Features implements Flags {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,15 +915,16 @@ class NextflowDSLImpl implements ASTTransformation {

/**
* Transform a map entry `emit: something` into `emit: 'something'
* and `topic: something` into `topic: 'something'
* (ie. as a constant) in a map expression passed as argument to
* a method call. This allow the syntax
*
* output:
* path 'foo', emit: bar
* path 'foo', emit: bar, topic: baz
*
* @param call
*/
protected void fixOutEmitOption(MethodCallExpression call) {
protected void fixOutEmitAndTopicOptions(MethodCallExpression call) {
List<Expression> args = isTupleX(call.arguments)?.expressions
if( !args ) return
if( args.size()<2 && (args.size()!=1 || call.methodAsString!='_out_stdout')) return
Expand All @@ -936,6 +937,9 @@ class NextflowDSLImpl implements ASTTransformation {
if( key?.text == 'emit' && val ) {
map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text))
}
else if( key?.text == 'topic' && val ) {
map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text))
}
}
}

Expand All @@ -955,7 +959,7 @@ class NextflowDSLImpl implements ASTTransformation {
// prefix the method name with the string '_out_'
methodCall.setMethod( new ConstantExpression('_out_' + methodName) )
fixMethodCall(methodCall)
fixOutEmitOption(methodCall)
fixOutEmitAndTopicOptions(methodCall)
}

else if( methodName in ['into','mode'] ) {
Expand Down
Loading

0 comments on commit 921313d

Please sign in to comment.