Experiments for adding Flow abstraction #61
Replies: 3 comments 1 reply
-
This seems very promising! My initial idea with streams would be something similar, but letting the caller provide its own "source" and "sink", something like type Flow[In, Out] = (ReadableChannel[In], SendableChannel[Out]) => Async ?=> Unit (where The idea of an operation which closes the channel but doesn't immediately disrupt the reader sounds like something we might consider with the existing channels themselves. It looks every useful! It's only my initial impression though, let me ping @m8nmueller who might have more in-depth thoughts about streaming! |
Beta Was this translation helpful? Give feedback.
-
FYI, the current Java 22's Gather API does not define how to interactive timer/outer world, the new api in gears should support this. If we can built all this right, we can even build a Java wrapper for Java user:). Thank you @tassiluca for sharing, but seems the API a little like the Kotlin https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt I would like to see this api works in a Reactive way(dynamic push pull), the current pulling patten in Kotlin Flow has bad performance. |
Beta Was this translation helpful? Give feedback.
-
Hi there! @tassiluca The idea of the You can find my implementation idea here: https://github.com/m8nmueller/gears/blob/streams/shared/src/main/scala/async/Stream.scala. I will continue working on this as part of my bachelor project at EPFL. @He-Pin The Gather API seems like an interesting approach to allow custom transformers. I'll have a look at both the JEP and your work on Pekko. What exactly do you mean by "interactive timer/outer world"? I will mostly focus on pushing streams using bounded buffers as means of back-pressure, as proposed by Martin Odersky. |
Beta Was this translation helpful? Give feedback.
-
Hi!
I'm opening this discussion (hopefully in the right place) to share some experiments I did to introduce cold
Flow
s in the codebase for modeling a stream of asynchronously computed values.I don't know if they can be useful, but I think it's worth sharing them.
Flow
s can be implemented using aTask
and aChannel
:Flow
creation, the client provides a block of code that emits values, which is wrapped inside aTask
that isstart
ed only when thecollect
method is called;emit
ted values are sent on aChannel
that is created when thecollect
method is called.Some notes:
TerminableChannel
is a specialization ofChannel
allowing toterminate
it, preventing further values from being sent, while still allowing toread
the values that have been sent before the termination (currently,read
returnsLeft(Closed)
once the channel is closed)foreach
method can be implemented to react to each sent value suspending the execution until all items have been consumedbody
throws an exception, it is sent on the channel wrapped inside aFailure
collect
method could be called concurrently by multiple threads asynchronized
block and aSemaphore
are used to ensure the task can get the correct newly generated channel instance before someone else can replace it with another instanceemit
function signature could be turned to be non-suspendable usingsendImmediately
on something like aTerminableChannel
backing to anUnboundedChannel
To create a
Flow
:Client-side:
The output will be (the square brackets are my annotations containing the time elapsed since the previous message):
In case of failures, the flow stops:
The output:
On top of
Flow
it is possible to implement functions likemap
andflatMap
:For instance, the following code:
results in:
flatMap
waits for the inner flow to complete before collecting the next item:Result:
Beta Was this translation helpful? Give feedback.
All reactions