Skip to content

Commit

Permalink
Rework bungee injector to allow for packet cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
booky10 committed Sep 20, 2024
1 parent 909492c commit a17e5f1
Showing 1 changed file with 50 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,33 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.PromiseCombiner;
import net.md_5.bungee.api.connection.ProxiedPlayer;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;

// Thanks to ViaVersion for the compression method.
@ChannelHandler.Sharable
public class PacketEventsEncoder extends MessageToMessageEncoder<ByteBuf> {
public class PacketEventsEncoder extends ChannelOutboundHandlerAdapter {

private static final Recycler<List<Object>> OUT_LIST_RECYCLER = new Recycler<List<Object>>() {
@Override
protected List<Object> newObject(Handle<List<Object>> handle) {
// the default bungee compressor only produces one output bytebuf
return new ArrayList<>(1);
}
};

public ProxiedPlayer player;
public User user;
public boolean handledCompression;
Expand All @@ -45,7 +62,7 @@ public PacketEventsEncoder(User user) {
this.user = user;
}

public void read(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
public void read(ChannelHandlerContext ctx, ByteBuf buffer, ChannelPromise promise) throws Exception {
boolean doCompression = handleCompressionOrder(ctx, buffer);
int firstReaderIndex = buffer.readerIndex();
PacketSendEvent packetSendEvent = EventCreationUtil.createSendEvent(ctx.channel(), user, player,
Expand All @@ -62,12 +79,12 @@ public void read(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) th
buffer.readerIndex(firstReaderIndex);
}
if (doCompression) {
recompress(ctx, buffer, out);
this.recompress(ctx, buffer, promise);
} else {
out.add(buffer.retain());
ctx.write(buffer, promise);
}
} else {
ByteBufHelper.clear(packetSendEvent.getByteBuf());
ReferenceCountUtil.release(packetSendEvent.getByteBuf());
}
if (packetSendEvent.hasPostTasks()) {
for (Runnable task : packetSendEvent.getPostTasks()) {
Expand All @@ -77,16 +94,17 @@ public void read(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) th
}

@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
if (!msg.isReadable()) {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (!(msg instanceof ByteBuf)) {
super.write(ctx, msg, promise);
return;
}
read(ctx, msg, out);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
buf.release();
} else {
this.read(ctx, buf, promise);
}
}

private boolean handleCompressionOrder(ChannelHandlerContext ctx, ByteBuf buffer) {
Expand Down Expand Up @@ -125,12 +143,27 @@ private boolean handleCompressionOrder(ChannelHandlerContext ctx, ByteBuf buffer
return false;
}

private void recompress(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) {
ChannelHandler compressor = ctx.pipeline().get("compress");
private void recompress(ChannelHandlerContext ctx, ByteBuf buffer, ChannelPromise promise) {
List<Object> out = OUT_LIST_RECYCLER.get();
try {
ChannelHandler compressor = ctx.pipeline().get("compress");
CustomPipelineUtil.callPacketEncodeByteBuf(compressor, ctx, buffer, out);
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (InvocationTargetException exception) {
throw new EncoderException("Error while recompressing bytebuf " + buffer.readableBytes(), exception);
}

int len = out.size();
if (len == 1) {
// should be the only case which
// happens on vanilla bungeecord
ctx.write(out.get(0), promise);
} else {
// copied from MessageToMessageEncoder#writePromiseCombiner
PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
for (int i = 0; i < len; i++) {
combiner.add(ctx.write(out.get(i)));
}
combiner.finish(promise);
}
}
}

0 comments on commit a17e5f1

Please sign in to comment.