-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Discussion: allow flowable to process multiple values simultaneously #3
Comments
Hi Clément, thanks for your interest and your use-case! If you have a Consumable of "values" and you want to process them in parallel, you can call values.subscribe(async ({ value, next }) => {
await server.hasCapacity;
server.process(value).catch(console.error);
next(); // Will be called immediately without waiting for processing
}); (You would need to consider what to really do with errors.) I think in most real cases the "server" here would not be "this server", i.e. this very node process, because heavy lifting is likely to be somewhere else, like an analytical engine. So, having some utility in this library to throttle based on local memory or CPU is probably not the right abstraction, unless it's just an example. Possibly we could have a |
Hi sir, Thanks lot for you answer and demo ! (amazing btw)
Totally agree.
That would be an great addition, Rx is definitely missing some kind of |
Thinking about it, the operator could take just the Taking inspiration from RxJS operators like delayWhen, it would make sense to have the So, I think our throttleWhen<T>(capacitySignal: (value: T, index: number) => Observable<any>): OperatorFunction<Bite<T>, T> I'm a bit backlogged at the moment, but if you'd like to have a go I'd be happy to review! |
That would be a nice design! For reference, here is what I ended up doing in the meantime. This is very ugly but hey, it works for now :D I will definitely give this proper |
All riiiiight. So, long story short, the previous version did silent all exceptions so I rewrote it using Far from perfect but that's a start. |
Hi there,
I just stumbled upon your trail of thoughts regarding rxjs and backpressure:
I built some kind of ETL using RxJS and I wanted to process "as much values as possible at the same time".
At the moment I have a pipeline (Observable) taking work units as input, doing lots of complex stuff and yielding success/failure status as output.
Obviously, I find myself in a position where I can't ask the pipeline to process all work units at once, so I need to do some kind of backpressure handling.
My idea would be to use a threshold on
process.memoryUsage()
andprocess.cpuUsage()
to know if the server can handle more work, and in this case, send more work units in the observable without waiting for the current work units to be done.I'm not sure that would fit the scope of this library, but if it does, I would be happy to contribute.
Thank you for you work <3
The text was updated successfully, but these errors were encountered: