diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java index 6eebd676c..84338d1df 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java @@ -84,7 +84,18 @@ public static int assertMtu(int mtu) { @Override public Mono send(Publisher frames) { - return Flux.from(frames).concatMap(this::sendOne).then(); + return delegate.send( + Flux.from(frames) + .concatMap( + frame -> { + FrameType frameType = FrameHeaderCodec.frameType(frame); + int readableBytes = frame.readableBytes(); + if (!shouldFragment(frameType, readableBytes)) { + return Flux.just(frame); + } + + return logFragments(Flux.from(fragmentFrame(alloc(), mtu, frame, frameType))); + })); } @Override @@ -95,6 +106,11 @@ public Mono sendOne(ByteBuf frame) { return delegate.sendOne(frame); } Flux fragments = Flux.from(fragmentFrame(alloc(), mtu, frame, frameType)); + fragments = logFragments(fragments); + return delegate.send(fragments); + } + + protected Flux logFragments(Flux fragments) { if (logger.isDebugEnabled()) { fragments = fragments.doOnNext( @@ -107,6 +123,6 @@ public Mono sendOne(ByteBuf frame) { ByteBufUtil.prettyHexDump(byteBuf)); }); } - return delegate.send(fragments); + return fragments; } }