You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The Rocket server should respond to the client. Hello from RSocket consumer
Actual Behavior
Throwing exception
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Source has to be ASYNC fuseable
Error stack trace
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Source has to be ASYNC fuseable
Caused by: java.lang.IllegalStateException: Source has to be ASYNC fuseable
at io.rsocket.resume.InMemoryResumableFramesStore$FramesSubscriber.onSubscribe(InMemoryResumableFramesStore.java:528) ~[rsocket-core-1.1.3.jar:na]
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104) ~[reactor-core-3.6.1.jar:3.6.1]
at io.rsocket.internal.UnboundedProcessor.subscribe(UnboundedProcessor.java:414) ~[rsocket-core-1.1.3.jar:na]
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals.subscribe(FluxContextWriteRestoringThreadLocals.java:46) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.Mono.subscribe(Mono.java:4512) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.Mono.subscribe(Mono.java:4339) ~[reactor-core-3.6.1.jar:3.6.1]
at io.rsocket.resume.ResumableDuplexConnection.<init>(ResumableDuplexConnection.java:83) ~[rsocket-core-1.1.3.jar:na]
at io.rsocket.core.ServerSetup$ResumableServerSetup.acceptRSocketSetup(ServerSetup.java:124) ~[rsocket-core-1.1.3.jar:na]
at io.rsocket.core.RSocketServer.acceptSetup(RSocketServer.java:418) ~[rsocket-core-1.1.3.jar:na]
at io.rsocket.core.RSocketServer.accept(RSocketServer.java:386) ~[rsocket-core-1.1.3.jar:na]
at io.rsocket.core.RSocketServer.lambda$acceptor$0(RSocketServer.java:370) ~[rsocket-core-1.1.3.jar:na]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onNext(FluxFirstWithSignal.java:332) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onNext(FluxTimeout.java:181) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:176) ~[reactor-core-3.6.1.jar:3.6.1]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:115) ~[rsocket-core-1.1.3.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.3.jar:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118) ~[reactor-core-3.6.1.jar:3.6.1]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294) ~[reactor-netty-core-1.1.14.jar:1.1.14]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403) ~[reactor-netty-core-1.1.14.jar:1.1.14]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426) ~[reactor-netty-core-1.1.14.jar:1.1.14]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.14.jar:1.1.14]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
@abdulrahimseera Hey! I had the same problem.
The fix was to downgrade reactor-core lib to 3.5.16. implementation("io.projectreactor:reactor-core:3.5.16")
The problem was in Mono subscribe method. In 3.6.x version they changed lines publisher.subscribe(subscriber); to
Operators.restoreContextOnSubscriberIfPublisherNonInternal returns FuseableContextWriteRestoringThreadLocalsSubscriber
In that class method requestFusion returns 0.
And that is why we have that exception in rsocket from InMemoryResumableFramesStore
See onSubscribe method qs.requestFusion(ANY); this return 0 from FuseableContextWriteRestoringThreadLocalsSubscriber
Expected Behavior
The Rocket server should respond to the client.
Hello from RSocket consumer
Actual Behavior
Throwing exception
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Source has to be ASYNC fuseable
Error stack trace
Steps to Reproduce
Sample project to reproduce the issue: https://github.com/abdulrahimseera/rsocket-spring-boot-3.2.x
Needs to run both services and then call "http://localhost:3333/hello/rsocket".
Possible Solution
Your Environment
netty
, ...): rsocket-transport-netty 1.1.3 & rsocket-micrometer 1.1.4javar -version
) or Node version (node --version
)): JDK 17 with Spring boot 3.2.1uname -a
): MacOs Venture Version 13.6.3 (22G436)The text was updated successfully, but these errors were encountered: