Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "topic" channel #4459

Merged
merged 22 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions docs/channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,47 @@ Y

See also: [fromList](#fromlist) factory method.

(channel-topic)=

### topic

:::{versionadded} 23.11.0-edge
:::

:::{note}
This feature requires the `nextflow.preview.topic` feature flag to be enabled.
:::

The `topic` method is used to create a "topic" channel, which is a queue channel that can receive items from multiple sources.
Copy link
Member

@bentsherman bentsherman Nov 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you want to describe "topic" channels as a new channel type alongside queue and value channels. Since a topic channel seems to behave like a queue channel, too keep things simple, I described it in this way

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Surely it deserves to be expanded. Another possibility could be introduce the "broadcast" channel type, along "queue" and "value". A broadcast channel can even many writers and many readers (opposed to a queue channel than can have exactly one write and reader) and it's identified by a "topic" name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like "broadcast" more than "topic", sounds to me like more appropriate jargon in the scope of Nextflow

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broadcast is not specific enough IMO because a value channel is also a broadcast (it can have many readers). Even the underlying GPars class is DataflowBroadcast. The topic channel is distinct because it can have many writers, but I don't know of any special term for that. I will see if I can find something from stream processing or digital circuits terminology...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find anything beyond the event bus pattern. On further reflection, I don't think we need to distinguish topic channels as a special channel type. A topic channel is just shorthand for a mix operation:

ch_foo = mix(foo1, foo2, foo3, foo4, foo5)

It's just a bunch of queue channels, and the "topic" is what brings them together.

I'm open to other words than topic if we can find a better one. Some alternatives include "category", "label", "tag"... but label and tag are already concepts in Nextflow and category is too broad IMO. "metadata" implies that the topic channel can only be used to collect metadata, but that need not be the case. I like the idea of describing the topic channel as an event bus, but any channel could be called a "bus".

How about... Channel.mixer()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quoting Paolo's comment which got mixed up:

I'm fine to stay with "topic" channel definition, but they should be documented as a new channel type because they have different semantics (many writers).

Thus it should be channel.topic, not fromTopic

Thinking more on this, I still think we should call it Channel.fromTopic and describe it as an operation that creates a queue channel rather than a new channel type. Marco, Phil, and I seem to be in agreement on this, but since this thread has meandered and the meeting didn't reach a solid conclusion, here is my argument put concisely:

A channel topic is literally an operation on queue channels. There is no new channel type under the hood, just queue channels coming in and a queue channel going out. It uses the DataflowBroadcast only to support multiple readers (in the same way as queue channels) and it uses the mix operator to support the multiple writers. In fact many operators support multiple writers, so that alone is not enough to warrant a new channel type. The implicit linking via topic is more unique, but when I tried to document it as a new channel type, I just found it unnecessary and more confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it uses the mix operator to support the multiple writers. In fact many operators support multiple writers, so that alone is not enough to warrant a new channel type.

👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you need to see things from the proper perspective. The most important thing is that the topic channel introduces a different way to compose and think Nextflow channels.

Instead of having one-to-one, producer-to-consumer messaging, the topic allows many producers to send messages over the same topic to many consumers.

It doesn't matter it could have been implemented using a composition of mix operators or how it's implemented under the door. The topic type is important to highlight the different paradigm that is introduced by this feature.

I've made a few changes in the docs to reflect this view. In any case, this is marked as experimental, we can always review and changes along the way in future releases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the topic channel introduces a different way to compose and think Nextflow channels.
The topic type is important to highlight the different paradigm that is introduced by this feature.

I had this chat with Paolo as well. I now better get the point around proposing a new paradigm, that unlocks new ways for devs to describe their pipelines.

Overall it was a good discussion, and most importantly it is good to have it as experimental, to leave room for upgrades in case we identify the need for them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do see the other perspective Paolo, after all I started this whole thread, but my thoughts evolved and I no longer think it's necessary to describe channel topics as a new type. Although it doesn't always happen this way, in this case I think the implementation details are quite instructive in how to describe it. If someone figures out to use a channel topic in some way other than an implicit mix, I might be convinced otherwise.

But I'm glad you went ahead and merged it. Better to get the feature out there for users to play with it. We can refine the docs as needed.


Any process can send items to a topic by using the `topic` option on an output:

```groovy
process foo {
output:
val('foo'), topic: 'my-topic'
bentsherman marked this conversation as resolved.
Show resolved Hide resolved
}

process bar {
output:
val('bar'), topic: 'my-topic'
}
```

Then, the `topic` method can be used to consume all items in the topic:

```groovy
Channel.topic('my-topic').view()
```

This approach is a convenient way to collect outputs from many sources without having to write the necessary channel logic. You can name topics however you want, and you can use different names to collect items for different "topics", as long as your process outputs and channel logic are consistent with each other.

:::{warning}
Any process that consumes a topic channel should not send any outputs to that topic, or else the pipeline will hang forever.
:::

See also: {ref}`process-additional-options` for process outputs.

(channel-value)=

### value
Expand Down
9 changes: 9 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1806,3 +1806,12 @@ Some features can be enabled using the `nextflow.enable` and `nextflow.preview`
: *Experimental: may change in a future release.*

: When `true`, enables process and workflow recursion. See [this GitHub discussion](https://github.com/nextflow-io/nextflow/discussions/2521) for more information.

`nextflow.preview.topic`

: :::{versionadded} 23.11.0-edge
:::

: *Experimental: may change in a future release.*

: When `true`, enables {ref}`topic channels <channel-topic>`.
81 changes: 39 additions & 42 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1166,62 +1166,59 @@ process foo {
```
:::

### Optional outputs
(process-additional-options)=

In most cases, a process is expected to produce an output for each output definition. However, there are situations where it is valid for a process to not generate output. In these cases, `optional: true` may be added to the output definition, which tells Nextflow not to fail the process if the declared output is not produced:
### Additional options

```groovy
output:
path("output.txt"), optional: true
```
The following options are available for all process outputs:

In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is legitimately missing, the process does not fail. The output channel will only contain values for those processes that produce `output.txt`.
`emit: <name>`

(process-multiple-outputs)=
: Defines the name of the output channel, which can be used to access the channel by name from the process output:

### Multiple outputs
```groovy
process FOO {
output:
path 'hello.txt', emit: hello
path 'bye.txt', emit: bye

"""
echo "hello" > hello.txt
echo "bye" > bye.txt
"""
}

When a process declares multiple outputs, each output can be accessed by index. The following example prints the second process output (indexes start at zero):
workflow {
FOO()
FOO.out.hello.view()
}
```

```groovy
process FOO {
output:
path 'bye_file.txt'
path 'hi_file.txt'
See {ref}`workflow-process-invocation` for more details.

"""
echo "bye" > bye_file.txt
echo "hi" > hi_file.txt
"""
}
`optional: true | false`

workflow {
FOO()
FOO.out[1].view()
}
```
: Normally, if a specified output is not produced by the task, the task will fail. Setting `optional: true` will cause the task to not fail, and instead emit nothing to the given output channel.

You can also use the `emit` option to assign a name to each output and access them by name:
```groovy
output:
path("output.txt"), optional: true
```

```groovy
process FOO {
output:
path 'bye_file.txt', emit: bye_file
path 'hi_file.txt', emit: hi_file
In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is missing, the task will not fail. The output channel will only contain values for those tasks that produced `output.txt`.

"""
echo "bye" > bye_file.txt
echo "hi" > hi_file.txt
"""
}
: :::{note}
While this option can be used with any process output, it cannot be applied to individual elements of a [tuple](#output-type-tuple) output. The entire tuple must be optional or not optional.
:::

workflow {
FOO()
FOO.out.hi_file.view()
}
```
`topic: <name>`

: :::{versionadded} 23.11.0-edge
:::

: *Experimental: may change in a future release.*

See {ref}`workflow-process-invocation` for more details.
: Defines the {ref}`topic channel <channel-topic>` to which the output will be sent. Cannot be used with the `emit` option on the same output.

## When

Expand Down
4 changes: 2 additions & 2 deletions docs/workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ workflow {
}
```

When a process defines multiple output channels, each output can be accessed using the array element operator (`out[0]`, `out[1]`, etc.) or using *named outputs* (see below).
When a process defines multiple output channels, each output can be accessed by index (`out[0]`, `out[1]`, etc.) or by name (see below).

The process output(s) can also be accessed like the return value of a function:

Expand Down Expand Up @@ -144,7 +144,7 @@ workflow {
}
```

See {ref}`process-multiple-outputs` for more details.
See {ref}`process outputs <process-additional-options>` for more details.

### Process named stdout

Expand Down
5 changes: 5 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Channel.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ class Channel {
return CH.queue()
}

static DataflowWriteChannel topic(String name) {
if( !NF.topicChannelEnabled ) throw new MissingMethodException('topic', Channel.class, InvokerHelper.EMPTY_ARGS)
return CH.topic(name)
}

/**
* Create a empty channel i.e. only emits a STOP signal
*
Expand Down
4 changes: 4 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/NF.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,8 @@ class NF {
static boolean isRecurseEnabled() {
NextflowMeta.instance.preview.recursion
}

static boolean isTopicChannelEnabled() {
NextflowMeta.instance.preview.topic
}
}
7 changes: 7 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class NextflowMeta {
volatile float dsl
boolean strict
boolean recursion
boolean topic

void setDsl( float num ) {
if( num == 1 )
Expand All @@ -59,6 +60,12 @@ class NextflowMeta {
log.warn "NEXTFLOW RECURSION IS A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES"
this.recursion = recurse
}

void setTopic(Boolean value) {
if( topic )
log.warn "TOPIC CHANNELS ARE A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES"
this.topic = value
}
}

static class Features implements Flags {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,15 +915,16 @@ class NextflowDSLImpl implements ASTTransformation {

/**
* Transform a map entry `emit: something` into `emit: 'something'
* and `topic: something` into `topic: 'something'
* (ie. as a constant) in a map expression passed as argument to
* a method call. This allow the syntax
*
* output:
* path 'foo', emit: bar
* path 'foo', emit: bar, topic: baz
*
* @param call
*/
protected void fixOutEmitOption(MethodCallExpression call) {
protected void fixOutEmitAndTopicOptions(MethodCallExpression call) {
List<Expression> args = isTupleX(call.arguments)?.expressions
if( !args ) return
if( args.size()<2 && (args.size()!=1 || call.methodAsString!='_out_stdout')) return
Expand All @@ -936,6 +937,9 @@ class NextflowDSLImpl implements ASTTransformation {
if( key?.text == 'emit' && val ) {
map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text))
}
else if( key?.text == 'topic' && val ) {
map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text))
}
}
}

Expand All @@ -955,7 +959,7 @@ class NextflowDSLImpl implements ASTTransformation {
// prefix the method name with the string '_out_'
methodCall.setMethod( new ConstantExpression('_out_' + methodName) )
fixMethodCall(methodCall)
fixOutEmitOption(methodCall)
fixOutEmitAndTopicOptions(methodCall)
}

else if( methodName in ['into','mode'] ) {
Expand Down
68 changes: 64 additions & 4 deletions modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package nextflow.extension

import static nextflow.Channel.*

import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
Expand All @@ -15,8 +17,6 @@ import nextflow.Channel
import nextflow.Global
import nextflow.NF
import nextflow.Session
import static nextflow.Channel.STOP

/**
* Helper class to handle channel internal api ops
*
Expand All @@ -30,7 +30,15 @@ class CH {
return (Session) Global.session
}

static private Map<DataflowQueue, DataflowBroadcast> bridges = new HashMap<>(10)
static class Topic {
String name
DataflowBroadcast broadcaster = new DataflowBroadcast()
List<DataflowWriteChannel> writers = new ArrayList<>(10)
}

static final private List<Topic> allTopics = new ArrayList<>(10)
bentsherman marked this conversation as resolved.
Show resolved Hide resolved

static final private Map<DataflowQueue, DataflowBroadcast> bridges = new HashMap<>(10)
bentsherman marked this conversation as resolved.
Show resolved Hide resolved

static DataflowReadChannel getReadChannel(channel) {
if (channel instanceof DataflowQueue)
Expand Down Expand Up @@ -66,14 +74,41 @@ class CH {
}

static void broadcast() {
// connect all dataflow queue variables to associated broadcast channel
// connect all broadcast topics, note this must be before the following
// "bridging" step because it can modify the final network topology
connectTopics()
// bridge together all broadcast channels
bridgeChannels()
}

static private void bridgeChannels() {
// connect all dataflow queue variables to associated broadcast channel
for( DataflowQueue queue : bridges.keySet() ) {
log.trace "Bridging dataflow queue=$queue"
def broadcast = bridges.get(queue)
queue.into(broadcast)
}
}

static private void connectTopics() {
for( Topic topic : allTopics ) {
if( topic.writers ) {
// the list of all writing dataflow queues for this topic
final ch = new ArrayList(topic.writers)
// the mix operator requires at least two sources, add an empty channel if needed
if( ch.size()==1 )
ch.add(empty())
// get a list of sources for the mix operator
final sources = ch.collect(it -> getReadChannel(it))
// mix all of them
new MixOp(sources).withTarget(topic.broadcaster).apply()
}
else {
topic.broadcaster.bind(STOP)
}
}
}

static void init() { bridges.clear() }

@PackageScope
Expand Down Expand Up @@ -102,6 +137,31 @@ class CH {
return new DataflowQueue()
}

static DataflowBroadcast topic(String name) {
synchronized (allTopics) {
def topic = allTopics.find(it -> it.name == name)
if( topic!=null )
return topic.broadcaster
// create a new topic
topic = new Topic(name:name)
allTopics.add(topic)
return topic.broadcaster
}
}

static DataflowWriteChannel topicWriter(String name) {
synchronized (allTopics) {
def topic = allTopics.find(it -> it.name == name)
if( topic==null ) {
topic = new Topic(name:name)
allTopics.add(topic)
}
def result = CH.create()
topic.writers.add(result)
return result
}
}

static boolean isChannel(obj) {
obj instanceof DataflowReadChannel || obj instanceof DataflowWriteChannel
}
Expand Down
Loading
Loading