diff --git a/urban-spork-common/src/com/urbanspork/common/channel/AttributeKeys.java b/urban-spork-common/src/com/urbanspork/common/channel/AttributeKeys.java index 9b1308c..eae1fe9 100644 --- a/urban-spork-common/src/com/urbanspork/common/channel/AttributeKeys.java +++ b/urban-spork-common/src/com/urbanspork/common/channel/AttributeKeys.java @@ -6,6 +6,7 @@ public class AttributeKeys { public static final AttributeKey SERVER_CONFIG = AttributeKey.newInstance("SERVER_CONFIG"); + public static final AttributeKey SERVER_UDP_RELAY_WORKER = AttributeKey.newInstance("SERVER_UDP_RELAY_WORKER"); private AttributeKeys() {} } diff --git a/urban-spork-common/src/com/urbanspork/common/codec/shadowsocks/udp/AeadCipherCodecImpl.java b/urban-spork-common/src/com/urbanspork/common/codec/shadowsocks/udp/AeadCipherCodecImpl.java index 3163fa9..1bc9beb 100644 --- a/urban-spork-common/src/com/urbanspork/common/codec/shadowsocks/udp/AeadCipherCodecImpl.java +++ b/urban-spork-common/src/com/urbanspork/common/codec/shadowsocks/udp/AeadCipherCodecImpl.java @@ -5,6 +5,7 @@ import com.urbanspork.common.protocol.shadowsocks.aead.AEAD; import com.urbanspork.common.protocol.socks.Address; import com.urbanspork.common.transport.udp.RelayingPacket; +import com.urbanspork.common.util.Dice; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.bouncycastle.crypto.InvalidCipherTextException; @@ -30,7 +31,7 @@ public AeadCipherCodecImpl(CipherMethod cipherMethod, Keys keys) { @Override public void encode(Context context, ByteBuf msg, ByteBuf out) throws InvalidCipherTextException { InetSocketAddress address = context.address(); - byte[] salt = context.control().salt(); + byte[] salt = Dice.rollBytes(cipherMethod.keySize()); out.writeBytes(salt); ByteBuf temp = Unpooled.buffer(Address.getLength(address)); Address.encode(address, temp); @@ -40,7 +41,7 @@ public void encode(Context context, ByteBuf msg, ByteBuf out) throws InvalidCiph @Override public RelayingPacket decode(Context context, ByteBuf in) throws InvalidCipherTextException { - byte[] salt = context.control().salt(); + byte[] salt = new byte[cipherMethod.keySize()]; in.readBytes(salt); ByteBuf packet = AEAD.UDP.newPayloadDecoder(cipherMethod, keys.encKey(), salt).decodePacket(in); InetSocketAddress address = Address.decode(packet); diff --git a/urban-spork-common/src/com/urbanspork/common/codec/shadowsocks/udp/UdpRelayCodec.java b/urban-spork-common/src/com/urbanspork/common/codec/shadowsocks/udp/UdpRelayCodec.java index 2979794..0ef8ab9 100644 --- a/urban-spork-common/src/com/urbanspork/common/codec/shadowsocks/udp/UdpRelayCodec.java +++ b/urban-spork-common/src/com/urbanspork/common/codec/shadowsocks/udp/UdpRelayCodec.java @@ -1,18 +1,23 @@ package com.urbanspork.common.codec.shadowsocks.udp; +import com.urbanspork.common.channel.AttributeKeys; import com.urbanspork.common.codec.shadowsocks.Mode; import com.urbanspork.common.config.ServerConfig; import com.urbanspork.common.manage.shadowsocks.ServerUserManager; import com.urbanspork.common.protocol.shadowsocks.Control; +import com.urbanspork.common.protocol.shadowsocks.replay.PacketWindowFilter; import com.urbanspork.common.transport.udp.DatagramPacketWrapper; import com.urbanspork.common.transport.udp.RelayingPacket; import com.urbanspork.common.util.LruCache; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramPacket; import io.netty.handler.codec.EncoderException; import io.netty.handler.codec.MessageToMessageCodec; +import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,22 +25,19 @@ import java.time.Duration; import java.util.List; +@ChannelHandler.Sharable public class UdpRelayCodec extends MessageToMessageCodec { private static final Logger logger = LoggerFactory.getLogger(UdpRelayCodec.class); - private final ServerConfig config; + private static final AttributeKey CONTROL = AttributeKey.valueOf(UdpRelayCodec.class, Control.class.getSimpleName()); private final Mode mode; private final ServerUserManager userManager; private final AeadCipherCodec cipher; - private final LruCache controlMap; - private final Control control0; + private final LruCache filters = new LruCache<>(1024, Duration.ofMinutes(5), (k, v) -> logger.trace("{} expire", k)); public UdpRelayCodec(ServerConfig config, Mode mode, ServerUserManager userManager) { - this.config = config; this.mode = mode; this.userManager = userManager; this.cipher = AeadCipherCodecs.get(config); - this.controlMap = new LruCache<>(1024, Duration.ofMinutes(5), (k, v) -> logger.info("[udp]control map expire {}={}", k, v)); - this.control0 = new Control(config.getCipher()); } @Override @@ -46,7 +48,10 @@ protected void encode(ChannelHandlerContext ctx, DatagramPacketWrapper msg, List } ByteBuf in = Unpooled.buffer(); DatagramPacket data = msg.packet(); - Control control = getControl(proxy); + Control control = ctx.channel().attr(CONTROL).get(); + if (control == null) { + control = new Control(); + } control.increasePacketId(1); logger.trace("[udp][{}][encode]{}|{}", mode, proxy, control); cipher.encode(new Context(mode, control, data.recipient(), userManager), data.content(), in); @@ -55,27 +60,26 @@ protected void encode(ChannelHandlerContext ctx, DatagramPacketWrapper msg, List @Override protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List out) throws Exception { - Control control = getControl(msg.sender()); + Control control = new Control(0, 0, 0); Context context = new Context(mode, control, null, userManager); RelayingPacket packet = cipher.decode(context, msg.content()); logger.trace("[udp][{}][decode]{}|{}", mode, msg.sender(), control); - if (cipher instanceof Aead2022CipherCodecImpl && !control.validatePacketId()) { - logger.error("[udp][{}→]{} packet_id {} out of window", mode, msg.sender(), control.getPacketId()); - return; + Channel channel = ctx.channel(); + channel.attr(CONTROL).set(control); + if (mode == Mode.Server) { + long clientSessionId = control.getClientSessionId(); + channel.attr(AttributeKeys.SERVER_UDP_RELAY_WORKER).set(clientSessionId); + PacketWindowFilter filter = filters.computeIfAbsent(clientSessionId, k -> new PacketWindowFilter()); + if (cipher instanceof Aead2022CipherCodecImpl && !filter.validatePacketId(control.getPacketId(), Long.MAX_VALUE)) { + logger.error("packet id out of window, {}→{}|{}", msg.sender(), packet.address(), control); + return; + } } out.add(new DatagramPacket(packet.content(), packet.address(), msg.sender())); } @Override public void handlerRemoved(ChannelHandlerContext ctx) { - controlMap.release(); - } - - private Control getControl(InetSocketAddress key) { - if (Mode.Client == mode) { - return control0; - } else { - return controlMap.computeIfAbsent(key, k -> new Control(config.getCipher())); - } + filters.release(); } } \ No newline at end of file diff --git a/urban-spork-common/src/com/urbanspork/common/protocol/shadowsocks/Control.java b/urban-spork-common/src/com/urbanspork/common/protocol/shadowsocks/Control.java index 963ba67..8ecdaec 100644 --- a/urban-spork-common/src/com/urbanspork/common/protocol/shadowsocks/Control.java +++ b/urban-spork-common/src/com/urbanspork/common/protocol/shadowsocks/Control.java @@ -1,30 +1,23 @@ package com.urbanspork.common.protocol.shadowsocks; -import com.urbanspork.common.codec.CipherKind; import com.urbanspork.common.manage.shadowsocks.ServerUser; -import com.urbanspork.common.protocol.shadowsocks.replay.PacketWindowFilter; -import com.urbanspork.common.util.Dice; import java.util.concurrent.ThreadLocalRandom; public class Control { - private final byte[] salt; private long clientSessionId; private long serverSessionId; private long packetId; private ServerUser user; - private final PacketWindowFilter packetWindowFilter; - public Control(CipherKind kind) { - this(Dice.rollBytes(kind.keySize()), ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong(), 0); + public Control() { + this(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong(), 0); } - Control(byte[] salt, long clientSessionId, long serverSessionId, long packetId) { - this.salt = salt; + public Control(long clientSessionId, long serverSessionId, long packetId) { this.clientSessionId = clientSessionId; this.serverSessionId = serverSessionId; this.packetId = packetId; - this.packetWindowFilter = new PacketWindowFilter(); } public void increasePacketId(long i) { @@ -40,14 +33,6 @@ public void increasePacketId(long i) { } } - public boolean validatePacketId() { - return packetWindowFilter.validatePacketId(packetId, Long.MAX_VALUE); - } - - public byte[] salt() { - return salt; - } - public long getPacketId() { return packetId; } diff --git a/urban-spork-server/src/com/urbanspork/server/ServerUdpRelayHandler.java b/urban-spork-server/src/com/urbanspork/server/ServerUdpRelayHandler.java index 1c28150..a56e067 100644 --- a/urban-spork-server/src/com/urbanspork/server/ServerUdpRelayHandler.java +++ b/urban-spork-server/src/com/urbanspork/server/ServerUdpRelayHandler.java @@ -1,5 +1,6 @@ package com.urbanspork.server; +import com.urbanspork.common.channel.AttributeKeys; import com.urbanspork.common.transport.udp.DatagramPacketWrapper; import com.urbanspork.common.transport.udp.PacketEncoding; import io.netty.bootstrap.Bootstrap; @@ -19,14 +20,15 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; public class ServerUdpRelayHandler extends SimpleChannelInboundHandler { - private static final Logger logger = LoggerFactory.getLogger(ServerUdpRelayHandler.class); - private final Map workerChannels = new ConcurrentHashMap<>(); + private final Map workerChannels = new ConcurrentHashMap<>(); private final EventLoopGroup workerGroup; private final PacketEncoding packetEncoding; + private Channel packetWorkerChannel; public ServerUdpRelayHandler(PacketEncoding packetEncoding, EventLoopGroup workerGroup) { super(false); @@ -39,7 +41,7 @@ public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) { Channel channel = ctx.channel(); InetSocketAddress sender = msg.sender(); InetSocketAddress recipient = msg.recipient(); - Channel workerChannel = workerChannel(sender, channel); + Channel workerChannel = workerChannel(channel); logger.info("[udp][relay]{}→{}~{}→{}", sender, recipient, channel.localAddress(), workerChannel.localAddress()); workerChannel.writeAndFlush(msg); } @@ -49,17 +51,36 @@ public void handlerRemoved(ChannelHandlerContext ctx) { for (Map.Entry entry : workerChannels.entrySet()) { entry.getValue().close(); } + if (packetWorkerChannel != null) { + packetWorkerChannel.close(); + } } - Channel workerChannel(InetSocketAddress key, Channel inboundChannel) { + Channel workerChannel(Channel inboundChannel) { if (PacketEncoding.Packet == packetEncoding) { - key = PacketEncoding.Packet.seqPacketMagicAddress(); + if (packetWorkerChannel == null) { + packetWorkerChannel = newWorkerChannel0(inboundChannel); + } + return packetWorkerChannel; } + Object key = Objects.requireNonNull(inboundChannel.attr(AttributeKeys.SERVER_UDP_RELAY_WORKER).get(), "require channel attribute: " + AttributeKeys.SERVER_UDP_RELAY_WORKER); return workerChannels.computeIfAbsent(key, k -> newWorkerChannel(k, inboundChannel)); } - private Channel newWorkerChannel(InetSocketAddress key, Channel channel) { - Channel workerChannel = new Bootstrap().group(workerGroup).channel(NioDatagramChannel.class) + private Channel newWorkerChannel(Object key, Channel channel) { + Channel workerChannel = newWorkerChannel0(channel); + workerChannel.closeFuture().addListener(future -> { + Channel removed = workerChannels.remove(key); + logger.info("[udp][binding]{} != {}", key, removed); + }); + logger.info("[udp][binding]{} == {}", key, workerChannel); + return workerChannel; + } + + // callback->server->client + private Channel newWorkerChannel0(Channel channel) { + // automatically assigned port now, may have security implications + return new Bootstrap().group(workerGroup).channel(NioDatagramChannel.class) .handler(new ChannelInitializer<>() { @Override protected void initChannel(Channel ch) { @@ -68,16 +89,10 @@ protected void initChannel(Channel ch) { new InboundHandler(channel) ); } - })// callback->server->client + }) // callback->server->client .bind(0) // automatically assigned port now, may have security implications .syncUninterruptibly() .channel(); - workerChannel.closeFuture().addListener(future -> { - Channel removed = workerChannels.remove(key); - logger.info("[udp][binding]{} != {}", key, removed); - }); - logger.info("[udp][binding]{} == {}", key, workerChannel); - return workerChannel; } private static class InboundHandler extends MessageToMessageCodec { diff --git a/urban-spork-server/src/com/urbanspork/server/trojan/ServerPacketCodec.java b/urban-spork-server/src/com/urbanspork/server/trojan/ServerPacketCodec.java index 71f565b..1057c49 100644 --- a/urban-spork-server/src/com/urbanspork/server/trojan/ServerPacketCodec.java +++ b/urban-spork-server/src/com/urbanspork/server/trojan/ServerPacketCodec.java @@ -1,5 +1,6 @@ package com.urbanspork.server.trojan; +import com.urbanspork.common.channel.AttributeKeys; import com.urbanspork.common.protocol.socks.Address; import com.urbanspork.common.protocol.trojan.Trojan; import com.urbanspork.common.transport.udp.DatagramPacketWrapper; @@ -34,6 +35,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) short length = msg.readShort(); msg.skipBytes(Trojan.CRLF.length); ByteBuf content = msg.readBytes(length); + ctx.channel().attr(AttributeKeys.SERVER_UDP_RELAY_WORKER).set(address); out.add(new DatagramPacket(content, recipient, address)); } } diff --git a/urban-spork-test/test/com/urbanspork/common/codec/shadowsocks/EmbeddedChannelTest.java b/urban-spork-test/test/com/urbanspork/common/codec/shadowsocks/EmbeddedChannelTest.java index b137f47..a143a95 100644 --- a/urban-spork-test/test/com/urbanspork/common/codec/shadowsocks/EmbeddedChannelTest.java +++ b/urban-spork-test/test/com/urbanspork/common/codec/shadowsocks/EmbeddedChannelTest.java @@ -11,6 +11,7 @@ import com.urbanspork.common.transport.udp.DatagramPacketWrapper; import com.urbanspork.common.util.Dice; import com.urbanspork.test.TestDice; +import com.urbanspork.test.template.TraceLevelLoggerTestTemplate; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; @@ -23,7 +24,7 @@ import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; -class EmbeddedChannelTest { +class EmbeddedChannelTest extends TraceLevelLoggerTestTemplate { @Test void testTcpRelayChannel() { int port = TestDice.rollPort(); @@ -109,4 +110,9 @@ void testAead2022UdpAntiReplay() { client.close(); server.close(); } + + @Override + protected Class loggerClass() { + return UdpRelayCodec.class; + } } diff --git a/urban-spork-test/test/com/urbanspork/common/codec/shadowsocks/udp/AeadCipherCodecTest.java b/urban-spork-test/test/com/urbanspork/common/codec/shadowsocks/udp/AeadCipherCodecTest.java index 4a81315..c92e8c8 100644 --- a/urban-spork-test/test/com/urbanspork/common/codec/shadowsocks/udp/AeadCipherCodecTest.java +++ b/urban-spork-test/test/com/urbanspork/common/codec/shadowsocks/udp/AeadCipherCodecTest.java @@ -37,7 +37,7 @@ void testIncorrectPassword() { void testTooShortHeader() { AeadCipherCodec codec = newAEADCipherCodec(); ByteBuf in = Unpooled.wrappedBuffer(Dice.rollBytes(3)); - Context context = new Context(Mode.Client, new Control(TestDice.rollCipher()), null, ServerUserManager.empty()); + Context context = new Context(Mode.Client, new Control(), null, ServerUserManager.empty()); Assertions.assertThrows(DecoderException.class, () -> codec.decode(context, in)); } @@ -51,10 +51,9 @@ void testEmptyMsg(Mode from, Mode to) throws InvalidCipherTextException { AeadCipherCodec codec = newAEADCipherCodec(); InetSocketAddress address = InetSocketAddress.createUnresolved(TestDice.rollHost(), TestDice.rollPort()); ByteBuf in = Unpooled.buffer(); - CipherKind kind = TestDice.rollCipher(); - codec.encode(new Context(from, new Control(kind), address, ServerUserManager.empty()), Unpooled.EMPTY_BUFFER, in); + codec.encode(new Context(from, new Control(), address, ServerUserManager.empty()), Unpooled.EMPTY_BUFFER, in); Assertions.assertTrue(in.isReadable()); - RelayingPacket pocket = codec.decode(new Context(to, new Control(kind), address, ServerUserManager.empty()), in); + RelayingPacket pocket = codec.decode(new Context(to, new Control(), address, ServerUserManager.empty()), in); Assertions.assertFalse(in.isReadable()); Assertions.assertNotNull(pocket); } @@ -63,18 +62,18 @@ void testEmptyMsg(Mode from, Mode to) throws InvalidCipherTextException { void testTooShortPacket() { AeadCipherCodec codec = newAEADCipherCodec(); ByteBuf in = Unpooled.buffer(); - Context c1 = new Context(Mode.Client, new Control(CipherKind.aead2022_blake3_aes_128_gcm), null, ServerUserManager.empty()); + Context c1 = new Context(Mode.Client, new Control(), null, ServerUserManager.empty()); Assertions.assertThrows(DecoderException.class, () -> codec.decode(c1, in)); - Context c2 = new Context(Mode.Server, new Control(CipherKind.aead2022_blake3_aes_128_gcm), null, ServerUserManager.empty()); + Context c2 = new Context(Mode.Server, new Control(), null, ServerUserManager.empty()); Assertions.assertThrows(DecoderException.class, () -> codec.decode(c2, in)); } @Test void testInvalidSocketType() throws InvalidCipherTextException { InetSocketAddress address = InetSocketAddress.createUnresolved(TestDice.rollHost(), TestDice.rollPort()); - Context c1 = new Context(Mode.Client, new Control(CipherKind.aead2022_blake3_aes_128_gcm), address, ServerUserManager.empty()); + Context c1 = new Context(Mode.Client, new Control(), address, ServerUserManager.empty()); testInvalidSocketType(c1); - Context c2 = new Context(Mode.Server, new Control(CipherKind.aead2022_blake3_aes_128_gcm), address, ServerUserManager.empty()); + Context c2 = new Context(Mode.Server, new Control(), address, ServerUserManager.empty()); testInvalidSocketType(c2); } diff --git a/urban-spork-test/test/com/urbanspork/common/codec/shadowsocks/udp/AeadCipherCodecsTest.java b/urban-spork-test/test/com/urbanspork/common/codec/shadowsocks/udp/AeadCipherCodecsTest.java index 5cde0bc..234154a 100644 --- a/urban-spork-test/test/com/urbanspork/common/codec/shadowsocks/udp/AeadCipherCodecsTest.java +++ b/urban-spork-test/test/com/urbanspork/common/codec/shadowsocks/udp/AeadCipherCodecsTest.java @@ -51,7 +51,7 @@ void testByKind(CipherKind kind) throws Exception { int port = TestDice.rollPort(); String host = TestDice.rollHost(); InetSocketAddress address = InetSocketAddress.createUnresolved(host, port); - cipherTest(new Context(Mode.Client, new Control(kind), address, ServerUserManager.empty()), new Context(Mode.Server, new Control(kind), null, ServerUserManager.empty())); + cipherTest(new Context(Mode.Client, new Control(), address, ServerUserManager.empty()), new Context(Mode.Server, new Control(), null, ServerUserManager.empty())); } private void cipherTest(Context request, Context response) throws Exception { diff --git a/urban-spork-test/test/com/urbanspork/common/protocol/shadowsocks/ControlTest.java b/urban-spork-test/test/com/urbanspork/common/protocol/shadowsocks/ControlTest.java index 030daad..b072221 100644 --- a/urban-spork-test/test/com/urbanspork/common/protocol/shadowsocks/ControlTest.java +++ b/urban-spork-test/test/com/urbanspork/common/protocol/shadowsocks/ControlTest.java @@ -1,6 +1,5 @@ package com.urbanspork.common.protocol.shadowsocks; -import com.urbanspork.common.util.Dice; import com.urbanspork.test.TestUtil; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -10,7 +9,7 @@ class ControlTest { @Test void testIncreasePacketId() { - Control control = new Control(null, 1, 1, Long.MAX_VALUE); + Control control = new Control(1, 1, Long.MAX_VALUE); control.increasePacketId(1); Assertions.assertEquals(0, control.getPacketId()); Assertions.assertNotEquals(1, control.getClientSessionId()); @@ -18,12 +17,10 @@ void testIncreasePacketId() { @Test void testGetterAndSetter() { - byte[] salt = Dice.rollBytes(32); long id = ThreadLocalRandom.current().nextLong(1, Long.MAX_VALUE); - Control control = new Control(salt, 0, 0, 0); + Control control = new Control(0, 0, 0); TestUtil.testGetterAndSetter(id, control, Control::getClientSessionId, Control::setClientSessionId); TestUtil.testGetterAndSetter(id, control, Control::getServerSessionId, Control::setServerSessionId); TestUtil.testGetterAndSetter(id, control, Control::getPacketId, Control::setPacketId); - Assertions.assertArrayEquals(salt, control.salt()); } } diff --git a/urban-spork-test/test/com/urbanspork/common/protocol/shadowsocks/aead2022/Aead2022Test.java b/urban-spork-test/test/com/urbanspork/common/protocol/shadowsocks/aead2022/Aead2022Test.java index d6a27ce..636d7c2 100644 --- a/urban-spork-test/test/com/urbanspork/common/protocol/shadowsocks/aead2022/Aead2022Test.java +++ b/urban-spork-test/test/com/urbanspork/common/protocol/shadowsocks/aead2022/Aead2022Test.java @@ -97,7 +97,7 @@ void testUdpUserNotFound() throws InvalidCipherTextException { in.writeCharSequence(msg, StandardCharsets.US_ASCII); ByteBuf out = Unpooled.buffer(); AEAD2022.UDP.encodePacket(AEAD2022.UDP.getCipher(kind, method, iPSK, sessionId), iPSK, 16, in, out); - Control control = new Control(kind); + Control control = new Control(); Assertions.assertThrows(DecoderException.class, () -> AEAD2022.UDP.decodePacket(kind, method, control, userManager, iPSK, out)); userManager.removeUserByHash(user.identityHash()); } diff --git a/urban-spork-test/test/com/urbanspork/server/ServerUdpRelayHandlerTest.java b/urban-spork-test/test/com/urbanspork/server/ServerUdpRelayHandlerTest.java index 765e812..32bd3aa 100644 --- a/urban-spork-test/test/com/urbanspork/server/ServerUdpRelayHandlerTest.java +++ b/urban-spork-test/test/com/urbanspork/server/ServerUdpRelayHandlerTest.java @@ -1,5 +1,6 @@ package com.urbanspork.server; +import com.urbanspork.common.channel.AttributeKeys; import com.urbanspork.common.transport.udp.PacketEncoding; import com.urbanspork.test.TestDice; import io.netty.buffer.Unpooled; @@ -23,9 +24,13 @@ class ServerUdpRelayHandlerTest { void testWorkAndIdle(PacketEncoding packetEncoding) throws Exception { NioEventLoopGroup group = new NioEventLoopGroup(); ServerUdpRelayHandler handler = new ServerUdpRelayHandler(packetEncoding, group); - handler.workerChannel(new InetSocketAddress(TestDice.rollPort()), new EmbeddedChannel(handler)); + EmbeddedChannel embeddedChannel = new EmbeddedChannel(handler); + embeddedChannel.attr(AttributeKeys.SERVER_UDP_RELAY_WORKER).set(new InetSocketAddress(TestDice.rollPort())); + handler.workerChannel(embeddedChannel); InetSocketAddress recipient = new InetSocketAddress(TestDice.rollPort()); - Channel outboundChannel = handler.workerChannel(recipient, new EmbeddedChannel(new ServerUdpRelayHandler(packetEncoding, group))); + EmbeddedChannel inboundChannel = new EmbeddedChannel(new ServerUdpRelayHandler(packetEncoding, group)); + inboundChannel.attr(AttributeKeys.SERVER_UDP_RELAY_WORKER).set(recipient); + Channel outboundChannel = handler.workerChannel(inboundChannel); Assertions.assertNotNull(outboundChannel); ChannelPipeline workerPipeline = outboundChannel.pipeline(); ChannelInboundHandlerAdapter last = (ChannelInboundHandlerAdapter) workerPipeline.last();