diff --git a/rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/NettyQuicConnectionHandler.kt b/rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/NettyQuicConnectionHandler.kt index 5110359d..1066ffee 100644 --- a/rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/NettyQuicConnectionHandler.kt +++ b/rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/NettyQuicConnectionHandler.kt @@ -81,7 +81,7 @@ private class NettyQuicConnectionInboundHandler( // Note: QUIC streams could be received unordered, so f.e we could receive first stream with id 4 and then with id 0 override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { msg as QuicStreamChannel - val state = NettyQuicStreamState(false) + val state = NettyQuicStreamState(null) if (inbound.trySend(state.wrapStream(msg)).isSuccess) { msg.pipeline().addLast(NettyQuicStreamInitializer(streamsContext, state, isClient)) } @@ -103,10 +103,17 @@ private class NettyQuicConnection( private val streamsContext: CoroutineContext, private val isClient: Boolean, ) : RSocketMultiplexedConnection { + private val startMarker = Job() // we need to `hack` only first stream created for client - stream where frames with streamId=0 will be sent private val first = AtomicBoolean(isClient) override suspend fun createStream(): RSocketMultiplexedConnection.Stream { - val state = NettyQuicStreamState(first.getAndSet(false)) + val startMarker = if (first.getAndSet(false)) { + startMarker + } else { + startMarker.join() + null + } + val state = NettyQuicStreamState(startMarker) val stream = try { channel.createStream( QuicStreamType.BIDIRECTIONAL, diff --git a/rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/NettyQuicStreamHandler.kt b/rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/NettyQuicStreamHandler.kt index e1230772..ba88d923 100644 --- a/rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/NettyQuicStreamHandler.kt +++ b/rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/NettyQuicStreamHandler.kt @@ -28,16 +28,14 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.channels.Channel -// TODO: first is a hack to initiate first stream because of buffering +// TODO: first stream is a hack to initiate first stream because of buffering // quic streams could be received unordered by server, so f.e we could receive first stream with id 4 and then with id 0 // for this, we disable buffering for first client stream, so that first frame will be sent first // this will affect performance for this stream, so we need to do something else here. @RSocketTransportApi -internal class NettyQuicStreamState(first: Boolean) { +internal class NettyQuicStreamState(val startMarker: CompletableJob?) { val closeMarker: CompletableJob = Job() - val outbound = channelForCloseable( - if (first) Channel.RENDEZVOUS else Channel.BUFFERED - ) + val outbound = channelForCloseable(Channel.BUFFERED) val inbound = channelForCloseable(Channel.UNLIMITED) fun wrapStream(stream: QuicStreamChannel): RSocketMultiplexedConnection.Stream = @@ -68,6 +66,7 @@ internal class NettyQuicStreamHandler( channel.flush() // await writing to respect transport backpressure lastWriteFuture.awaitFuture() + state.startMarker?.complete() } } finally { withContext(NonCancellable) {