Skip to content

Commit

Permalink
Zero copy (#444)
Browse files Browse the repository at this point in the history
* Added configurable frame decoder

* Test frame decoder with ping pong
  • Loading branch information
Ryland Degnan authored and robertroeser committed Nov 12, 2017
1 parent 3b53d5d commit a83a173
Show file tree
Hide file tree
Showing 43 changed files with 769 additions and 441 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.util.PayloadImpl;
import io.rsocket.util.DefaultPayload;
import reactor.core.publisher.Flux;
import java.net.URI;
Expand All @@ -72,7 +72,7 @@ public class ExampleClient {
RSocket client = RSocketFactory.connect().keepAlive().transport(ws).start().block();
try {
Flux<Payload> s = client.requestStream(PayloadImpl.textPayload("peace"));
Flux<Payload> s = client.requestStream(DefaultPayload.textPayload("peace"));
s.take(10).doOnNext(p -> System.out.println(p.getDataUtf8())).blockLast();
} finally {
Expand Down
6 changes: 3 additions & 3 deletions rsocket-core/src/jmh/java/io/rsocket/FragmentationPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.rsocket.fragmentation.FrameFragmenter;
import io.rsocket.fragmentation.FrameReassembler;
import io.rsocket.util.PayloadImpl;
import io.rsocket.util.DefaultPayload;
import java.nio.ByteBuffer;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -43,13 +43,13 @@ public void setup(Blackhole bh) {
ByteBuffer data = createRandomBytes(1 << 18);
ByteBuffer metadata = createRandomBytes(1 << 18);
largeFrame =
Frame.Request.from(1, FrameType.REQUEST_RESPONSE, new PayloadImpl(data, metadata), 1);
Frame.Request.from(1, FrameType.REQUEST_RESPONSE, DefaultPayload.create(data, metadata), 1);
largeFrameFragmenter = new FrameFragmenter(1024);

data = createRandomBytes(16);
metadata = createRandomBytes(16);
smallFrame =
Frame.Request.from(1, FrameType.REQUEST_RESPONSE, new PayloadImpl(data, metadata), 1);
Frame.Request.from(1, FrameType.REQUEST_RESPONSE, DefaultPayload.create(data, metadata), 1);
smallFrameFragmenter = new FrameFragmenter(2);
smallFramesIterable =
smallFrameFragmenter
Expand Down
5 changes: 3 additions & 2 deletions rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

import io.rsocket.RSocketFactory.Start;
import io.rsocket.perfutil.TestDuplexConnection;
import io.rsocket.util.PayloadImpl;
import io.rsocket.util.DefaultPayload;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -79,7 +80,7 @@ public static class Input {

static final ByteBuffer HELLO = ByteBuffer.wrap("HELLO".getBytes(StandardCharsets.UTF_8));

static final Payload HELLO_PAYLOAD = new PayloadImpl(HELLO);
static final Payload HELLO_PAYLOAD = new DefaultPayload(HELLO);

static final DirectProcessor<Frame> clientReceive = DirectProcessor.create();
static final DirectProcessor<Frame> serverReceive = DirectProcessor.create();
Expand Down
86 changes: 62 additions & 24 deletions rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,50 @@

import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.AbstractReferenceCounted;
import io.rsocket.Frame.Setup;
import io.rsocket.frame.SetupFrameFlyweight;
import java.nio.ByteBuffer;

/**
* Exposed to server for determination of RequestHandler based on mime types and SETUP metadata/data
*/
public abstract class ConnectionSetupPayload implements Payload {
public abstract class ConnectionSetupPayload extends AbstractReferenceCounted implements Payload {

public static final int NO_FLAGS = 0;
public static final int HONOR_LEASE = SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE;
public static final int STRICT_INTERPRETATION = SetupFrameFlyweight.FLAGS_STRICT_INTERPRETATION;

public static ConnectionSetupPayload create(String metadataMimeType, String dataMimeType) {
return new ConnectionSetupPayloadImpl(
metadataMimeType, dataMimeType, Frame.NULL_BYTEBUFFER, Frame.NULL_BYTEBUFFER, NO_FLAGS);
return new DefaultConnectionSetupPayload(
metadataMimeType, dataMimeType, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER, NO_FLAGS);
}

public static ConnectionSetupPayload create(
String metadataMimeType, String dataMimeType, Payload payload) {
return new ConnectionSetupPayloadImpl(
return new DefaultConnectionSetupPayload(
metadataMimeType,
dataMimeType,
payload.getData(),
payload.getMetadata(),
payload.sliceData(),
payload.sliceMetadata(),
payload.hasMetadata() ? FLAGS_M : 0);
}

public static ConnectionSetupPayload create(
String metadataMimeType, String dataMimeType, int flags) {
return new ConnectionSetupPayloadImpl(
metadataMimeType, dataMimeType, Frame.NULL_BYTEBUFFER, Frame.NULL_BYTEBUFFER, flags);
return new DefaultConnectionSetupPayload(
metadataMimeType, dataMimeType, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER, flags);
}

public static ConnectionSetupPayload create(final Frame setupFrame) {
Frame.ensureFrameType(FrameType.SETUP, setupFrame);
return new ConnectionSetupPayloadImpl(
Frame.Setup.metadataMimeType(setupFrame),
Frame.Setup.dataMimeType(setupFrame),
setupFrame.getData(),
setupFrame.getMetadata(),
Frame.Setup.getFlags(setupFrame));
return new DefaultConnectionSetupPayload(
Setup.metadataMimeType(setupFrame),
Setup.dataMimeType(setupFrame),
setupFrame.sliceData(),
setupFrame.sliceMetadata(),
Setup.getFlags(setupFrame));
}

public abstract String metadataMimeType();
Expand All @@ -79,27 +82,42 @@ public boolean hasMetadata() {
return Frame.isFlagSet(getFlags(), FLAGS_M);
}

private static final class ConnectionSetupPayloadImpl extends ConnectionSetupPayload {
@Override
public ConnectionSetupPayload retain() {
super.retain();
return this;
}

@Override
public ConnectionSetupPayload retain(int increment) {
super.retain(increment);
return this;
}

public abstract ConnectionSetupPayload touch();
public abstract ConnectionSetupPayload touch(Object hint);

private static final class DefaultConnectionSetupPayload extends ConnectionSetupPayload {

private final String metadataMimeType;
private final String dataMimeType;
private final ByteBuffer data;
private final ByteBuffer metadata;
private final ByteBuf data;
private final ByteBuf metadata;
private final int flags;

public ConnectionSetupPayloadImpl(
public DefaultConnectionSetupPayload(
String metadataMimeType,
String dataMimeType,
ByteBuffer data,
ByteBuffer metadata,
ByteBuf data,
ByteBuf metadata,
int flags) {
this.metadataMimeType = metadataMimeType;
this.dataMimeType = dataMimeType;
this.data = data;
this.metadata = metadata;
this.flags = flags;

if (!hasMetadata() && metadata.remaining() > 0) {
if (!hasMetadata() && metadata.readableBytes() > 0) {
throw new IllegalArgumentException("metadata flag incorrect");
}
}
Expand All @@ -115,18 +133,38 @@ public String dataMimeType() {
}

@Override
public ByteBuffer getData() {
public ByteBuf sliceData() {
return data;
}

@Override
public ByteBuffer getMetadata() {
public ByteBuf sliceMetadata() {
return metadata;
}

@Override
public int getFlags() {
return flags;
}

@Override
public ConnectionSetupPayload touch() {
data.touch();
metadata.touch();
return this;
}

@Override
public ConnectionSetupPayload touch(Object hint) {
data.touch(hint);
metadata.touch(hint);
return this;
}

@Override
protected void deallocate() {
data.release();
metadata.release();
}
}
}
70 changes: 19 additions & 51 deletions rsocket-core/src/main/java/io/rsocket/Frame.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.rsocket.frame.RequestNFrameFlyweight;
import io.rsocket.frame.SetupFrameFlyweight;
import io.rsocket.frame.VersionFlyweight;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;
import org.slf4j.Logger;
Expand All @@ -41,9 +40,7 @@
*
* <p>This provides encoding, decoding and field accessors.
*/
public class Frame implements ByteBufHolder {
public static final ByteBuffer NULL_BYTEBUFFER = ByteBuffer.allocateDirect(0);

public class Frame implements Payload, ByteBufHolder {
private static final Recycler<Frame> RECYCLER =
new Recycler<Frame>() {
protected Frame newObject(Handle<Frame> handle) {
Expand All @@ -52,7 +49,7 @@ protected Frame newObject(Handle<Frame> handle) {
};

private final Handle<Frame> handle;
private @Nullable ByteBuf content;
private ByteBuf content;

private Frame(final Handle<Frame> handle) {
this.handle = handle;
Expand Down Expand Up @@ -183,43 +180,25 @@ public boolean release(int decrement) {
}

/**
* Return {@link ByteBuffer} that is a {@link ByteBuffer#slice()} for the frame metadata
* Return {@link ByteBuf} that is a {@link ByteBuf#slice()} for the frame metadata
*
* <p>If no metadata is present, the ByteBuffer will have 0 capacity.
* <p>If no metadata is present, the ByteBuf will have 0 capacity.
*
* @return ByteBuffer containing the content
* @return ByteBuf containing the content
*/
public ByteBuffer getMetadata() {
final ByteBuf metadata = FrameHeaderFlyweight.sliceFrameMetadata(content);
if (metadata == null) {
return NULL_BYTEBUFFER;
} else if (metadata.readableBytes() > 0) {
final ByteBuffer buffer = ByteBuffer.allocateDirect(metadata.readableBytes());
metadata.readBytes(buffer);
buffer.flip();
return buffer;
} else {
return NULL_BYTEBUFFER;
}
public ByteBuf sliceMetadata() {
return hasMetadata() ? FrameHeaderFlyweight.sliceFrameMetadata(content) : Unpooled.EMPTY_BUFFER;
}

/**
* Return {@link ByteBuffer} that is a {@link ByteBuffer#slice()} for the frame data
* Return {@link ByteBuf} that is a {@link ByteBuf#slice()} for the frame data
*
* <p>If no data is present, the ByteBuffer will have 0 capacity.
* <p>If no data is present, the ByteBuf will have 0 capacity.
*
* @return ByteBuffer containing the data
* @return ByteBuf containing the data
*/
public ByteBuffer getData() {
final ByteBuf data = FrameHeaderFlyweight.sliceFrameData(content);
if (data.readableBytes() > 0) {
final ByteBuffer buffer = ByteBuffer.allocateDirect(data.readableBytes());
data.readBytes(buffer);
buffer.flip();
return buffer;
} else {
return NULL_BYTEBUFFER;
}
public ByteBuf sliceData() {
return FrameHeaderFlyweight.sliceFrameData(content);
}

/**
Expand Down Expand Up @@ -270,14 +249,11 @@ public static int setFlag(int current, int toSet) {
return current | toSet;
}

@Override
public boolean hasMetadata() {
return Frame.isFlagSet(this.flags(), FLAGS_M);
}

public String getDataUtf8() {
return StandardCharsets.UTF_8.decode(getData()).toString();
}

/* TODO:
*
* fromRequest(type, id, payload)
Expand All @@ -297,14 +273,8 @@ public static Frame from(
String metadataMimeType,
String dataMimeType,
Payload payload) {
final ByteBuf metadata =
payload.hasMetadata()
? Unpooled.wrappedBuffer(payload.getMetadata())
: Unpooled.EMPTY_BUFFER;
final ByteBuf data =
payload.getData() != null
? Unpooled.wrappedBuffer(payload.getData())
: Unpooled.EMPTY_BUFFER;
final ByteBuf metadata = payload.hasMetadata() ? payload.sliceMetadata() : Unpooled.EMPTY_BUFFER;
final ByteBuf data = payload.sliceData();

final Frame frame = RECYCLER.get();
frame.content =
Expand Down Expand Up @@ -460,9 +430,8 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init
if (initialRequestN < 1) {
throw new IllegalStateException("initial request n must be greater than 0");
}
final @Nullable ByteBuf metadata =
payload.hasMetadata() ? Unpooled.wrappedBuffer(payload.getMetadata()) : null;
final ByteBuf data = Unpooled.wrappedBuffer(payload.getData());
final @Nullable ByteBuf metadata = payload.hasMetadata() ? payload.sliceMetadata() : null;
final ByteBuf data = payload.sliceData();

final Frame frame = RECYCLER.get();
frame.content =
Expand Down Expand Up @@ -561,9 +530,8 @@ public static Frame from(int streamId, FrameType type, Payload payload) {
}

public static Frame from(int streamId, FrameType type, Payload payload, int flags) {
final ByteBuf metadata =
payload.hasMetadata() ? Unpooled.wrappedBuffer(payload.getMetadata()) : null;
final ByteBuf data = Unpooled.wrappedBuffer(payload.getData());
final ByteBuf metadata = payload.hasMetadata() ? payload.sliceMetadata() : null;
final ByteBuf data = payload.sliceData();
return from(streamId, type, metadata, data, flags);
}

Expand Down
Loading

0 comments on commit a83a173

Please sign in to comment.