From 972826846f28a72b89e923761dcfd93ddac92bbf Mon Sep 17 00:00:00 2001 From: Zmax0 Date: Mon, 5 Feb 2024 10:39:45 +0800 Subject: [PATCH] refactor: add a record class instead of `Map.Entry` --- .../client/gui/console/component/Proxy.java | 11 ++--- .../src/com/urbanspork/client/Client.java | 23 +++++----- .../src/com/urbanspork/server/Server.java | 33 +++++++------- .../client/ClientSocksHandshakeTestCase.java | 11 ++--- .../com/urbanspork/client/ClientTestCase.java | 13 +++--- .../com/urbanspork/server/ServerTestCase.java | 16 +++---- .../test/com/urbanspork/test/TCPTestCase.java | 30 ++++++------- .../test/com/urbanspork/test/UdpTestCase.java | 24 +++++----- .../test/template/TestTemplate.java | 20 +++++---- .../test/template/UdpTestTemplate.java | 44 +++++++++---------- 10 files changed, 104 insertions(+), 121 deletions(-) diff --git a/urban-spork-client-gui/src/com/urbanspork/client/gui/console/component/Proxy.java b/urban-spork-client-gui/src/com/urbanspork/client/gui/console/component/Proxy.java index e303743..cb66524 100644 --- a/urban-spork-client-gui/src/com/urbanspork/client/gui/console/component/Proxy.java +++ b/urban-spork-client-gui/src/com/urbanspork/client/gui/console/component/Proxy.java @@ -4,11 +4,8 @@ import com.urbanspork.client.gui.Resource; import com.urbanspork.common.config.ClientConfig; import com.urbanspork.common.config.ServerConfig; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.ServerSocketChannel; import java.awt.TrayIcon.MessageType; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -20,7 +17,7 @@ public class Proxy { private static final ExecutorService executor = Executors.newSingleThreadExecutor(); - private static Map.Entry client; + private static Client.Instance client; private Proxy() {} @@ -31,9 +28,9 @@ public static void launch() { return; } if (client != null) { - Client.close(client); + client.close(); } - CompletableFuture> promise = new CompletableFuture<>(); + CompletableFuture promise = new CompletableFuture<>(); executor.submit(() -> Client.launch(config, promise)); try { client = promise.get(); @@ -50,7 +47,7 @@ public static void launch() { } public static void exit() { - Client.close(client); + client.close(); executor.shutdown(); } } diff --git a/urban-spork-client/src/com/urbanspork/client/Client.java b/urban-spork-client/src/com/urbanspork/client/Client.java index f824bf5..c63bb96 100644 --- a/urban-spork-client/src/com/urbanspork/client/Client.java +++ b/urban-spork-client/src/com/urbanspork/client/Client.java @@ -26,7 +26,6 @@ import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.Map; import java.util.concurrent.CompletableFuture; public class Client { @@ -37,7 +36,7 @@ public static void main(String[] args) { launch(ConfigHandler.DEFAULT.read(), new CompletableFuture<>()); } - public static void launch(ClientConfig config, CompletableFuture> promise) { + public static void launch(ClientConfig config, CompletableFuture promise) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { @@ -54,13 +53,13 @@ public static void launch(ClientConfig config, CompletableFuture tcp{} udp{} ", tcpLocalAddress, udp.localAddress()); - Map.Entry client = Map.entry(tcp, udp); + Instance client = new Instance(tcp, udp); promise.complete(client); }); - Map.Entry client = promise.get(); + Instance client = promise.get(); CompletableFuture.allOf( - CompletableFuture.supplyAsync(() -> client.getKey().closeFuture().syncUninterruptibly()), - CompletableFuture.supplyAsync(() -> client.getValue().closeFuture().syncUninterruptibly()) + CompletableFuture.supplyAsync(() -> client.tcp().closeFuture().syncUninterruptibly()), + CompletableFuture.supplyAsync(() -> client.udp().closeFuture().syncUninterruptibly()) ).get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -73,11 +72,6 @@ public static void launch(ClientConfig config, CompletableFuture client) { - client.getKey().close().awaitUninterruptibly(); - client.getValue().close().awaitUninterruptibly(); - } - private static DatagramChannel launchUdp(EventLoopGroup bossGroup, EventLoopGroup workerGroup, ClientConfig config) throws InterruptedException { ServerConfig current = config.getCurrent(); ChannelHandler udpTransportHandler; @@ -99,4 +93,11 @@ protected void initChannel(Channel ch) { }) .bind(InetAddress.getLoopbackAddress(), config.getPort()).sync().channel(); } + + public record Instance(ServerSocketChannel tcp, DatagramChannel udp) { + public void close() { + tcp.close().awaitUninterruptibly(); + udp.close().awaitUninterruptibly(); + } + } } diff --git a/urban-spork-server/src/com/urbanspork/server/Server.java b/urban-spork-server/src/com/urbanspork/server/Server.java index 0f18fe0..8259522 100644 --- a/urban-spork-server/src/com/urbanspork/server/Server.java +++ b/urban-spork-server/src/com/urbanspork/server/Server.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -49,21 +48,21 @@ public static void launch(List configs) { launch(configs, new CompletableFuture<>()); } - public static void launch(List configs, CompletableFuture>>> promise) { + public static void launch(List configs, CompletableFuture> promise) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { - List>> servers = new ArrayList<>(configs.size()); + List servers = new ArrayList<>(configs.size()); int count = 0; for (ServerConfig config : configs) { - Map.Entry> server = startup(bossGroup, workerGroup, config); - count += server.getValue().isPresent() ? 2 : 1; + Instance server = startup(bossGroup, workerGroup, config); + count += server.udp().isPresent() ? 2 : 1; servers.add(server); } CountDownLatch latch = new CountDownLatch(count); - for (Map.Entry> server : servers) { - server.getKey().closeFuture().addListener(future -> latch.countDown()); - server.getValue().ifPresent(v -> v.closeFuture().addListener(future -> latch.countDown())); + for (Instance server : servers) { + server.tcp().closeFuture().addListener(future -> latch.countDown()); + server.udp().ifPresent(v -> v.closeFuture().addListener(future -> latch.countDown())); } promise.complete(servers); latch.await(); // main thread is waiting here @@ -80,14 +79,7 @@ public static void launch(List configs, CompletableFuture>> servers) { - for (Map.Entry> entry : servers) { - entry.getKey().close().awaitUninterruptibly(); - entry.getValue().ifPresent(c -> c.close().awaitUninterruptibly()); - } - } - - private static Map.Entry> startup(EventLoopGroup bossGroup, EventLoopGroup workerGroup, ServerConfig config) + private static Instance startup(EventLoopGroup bossGroup, EventLoopGroup workerGroup, ServerConfig config) throws InterruptedException { if (Protocols.shadowsocks == config.getProtocol()) { List user = config.getUser(); @@ -101,7 +93,7 @@ private static Map.Entry> startup .bind(config.getPort()).sync().addListener(future -> logger.info("Startup tcp server => {}", config)).channel(); config.setPort(tcp.localAddress().getPort()); Optional udp = startupUdp(bossGroup, workerGroup, config); - return Map.entry(tcp, udp); + return new Instance(tcp, udp); } private static Optional startupUdp(EventLoopGroup bossGroup, EventLoopGroup workerGroup, ServerConfig config) throws InterruptedException { @@ -124,4 +116,11 @@ protected void initChannel(Channel ch) { return Optional.empty(); } } + + public record Instance(ServerSocketChannel tcp, Optional udp) { + public void close() { + tcp.close().awaitUninterruptibly(); + udp.ifPresent(c -> c.close().awaitUninterruptibly()); + } + } } diff --git a/urban-spork-test/test/com/urbanspork/client/ClientSocksHandshakeTestCase.java b/urban-spork-test/test/com/urbanspork/client/ClientSocksHandshakeTestCase.java index 23c7f6f..034c5f9 100644 --- a/urban-spork-test/test/com/urbanspork/client/ClientSocksHandshakeTestCase.java +++ b/urban-spork-test/test/com/urbanspork/client/ClientSocksHandshakeTestCase.java @@ -7,8 +7,6 @@ import com.urbanspork.test.TestDice; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.ServerSocketChannel; import io.netty.handler.codec.socksx.v5.Socks5CommandType; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; @@ -17,7 +15,6 @@ import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -30,21 +27,21 @@ class ClientSocksHandshakeTestCase { void testUdpEnable() throws InterruptedException, ExecutionException { ClientConfig config = ClientConfigTestCase.testConfig(0, 0); config.getServers().getFirst().setProtocol(Protocols.vmess); - Map.Entry client = ClientTestCase.asyncLaunchClient(config); + Client.Instance client = ClientTestCase.asyncLaunchClient(config); InetSocketAddress proxyAddress = new InetSocketAddress(config.getPort()); InetSocketAddress dstAddress1 = new InetSocketAddress(InetAddress.getLoopbackAddress(), TestDice.rollPort()); assertFailedHandshake(proxyAddress, dstAddress1); - Client.close(client); + client.close(); } @Test void testIllegalDstAddress() throws InterruptedException, ExecutionException { ClientConfig config = ClientConfigTestCase.testConfig(0, 0); - Map.Entry client = ClientTestCase.asyncLaunchClient(config); + Client.Instance client = ClientTestCase.asyncLaunchClient(config); InetSocketAddress proxyAddress = new InetSocketAddress(config.getPort()); InetSocketAddress dstAddress1 = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); assertFailedHandshake(proxyAddress, dstAddress1); - Client.close(client); + client.close(); } diff --git a/urban-spork-test/test/com/urbanspork/client/ClientTestCase.java b/urban-spork-test/test/com/urbanspork/client/ClientTestCase.java index 89e8097..9378439 100644 --- a/urban-spork-test/test/com/urbanspork/client/ClientTestCase.java +++ b/urban-spork-test/test/com/urbanspork/client/ClientTestCase.java @@ -6,8 +6,6 @@ import com.urbanspork.common.protocol.socks.ClientHandshake; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.ServerSocketChannel; import io.netty.handler.codec.socksx.v5.Socks5CommandType; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; @@ -24,7 +22,6 @@ import java.net.InetSocketAddress; import java.util.Arrays; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -64,10 +61,10 @@ void testExit() { @Test @Order(2) void testLaunchFailed() throws InterruptedException, ExecutionException { - ServerSocketChannel c1 = asyncLaunchClient(ClientConfigTestCase.testConfig(0, 0)).getKey(); - ClientConfig config = ClientConfigTestCase.testConfig(c1.localAddress().getPort(), 0); + Client.Instance client = asyncLaunchClient(ClientConfigTestCase.testConfig(0, 0)); + ClientConfig config = ClientConfigTestCase.testConfig(client.tcp().localAddress().getPort(), 0); Assertions.assertThrows(ExecutionException.class, () -> asyncLaunchClient(config)); - c1.close(); + client.close(); } @ParameterizedTest @@ -79,8 +76,8 @@ void testHandshake(Socks5CommandType type) { Assertions.assertThrows(ExecutionException.class, () -> ClientHandshake.noAuth(group, type, proxyAddress, dstAddress).get(10, TimeUnit.SECONDS)); } - public static Map.Entry asyncLaunchClient(ClientConfig config) throws InterruptedException, ExecutionException { - CompletableFuture> promise = new CompletableFuture<>(); + public static Client.Instance asyncLaunchClient(ClientConfig config) throws InterruptedException, ExecutionException { + CompletableFuture promise = new CompletableFuture<>(); SERVICE.submit(() -> Client.launch(config, promise)); return promise.get(); } diff --git a/urban-spork-test/test/com/urbanspork/server/ServerTestCase.java b/urban-spork-test/test/com/urbanspork/server/ServerTestCase.java index b73aa5d..6204de6 100644 --- a/urban-spork-test/test/com/urbanspork/server/ServerTestCase.java +++ b/urban-spork-test/test/com/urbanspork/server/ServerTestCase.java @@ -10,9 +10,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; -import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LoggingHandler; @@ -24,8 +22,6 @@ import java.net.InetSocketAddress; import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -52,7 +48,7 @@ void launchRejected() { void launchFailed() { int port = TestDice.rollPort(); List configs = ServerConfigTestCase.testConfigs(port, port); - CompletableFuture>>> promise = new CompletableFuture<>(); + CompletableFuture> promise = new CompletableFuture<>(); Server.launch(configs, promise); Assertions.assertTrue(promise.isCompletedExceptionally()); } @@ -78,9 +74,9 @@ void sendInvalidUDP() throws InterruptedException, ExecutionException { ServerConfig config = ServerConfigTestCase.testConfig(0); config.setNetworks(new Network[]{Network.TCP, Network.UDP}); try (ExecutorService service = Executors.newVirtualThreadPerTaskExecutor()) { - CompletableFuture>>> promise = new CompletableFuture<>(); + CompletableFuture> promise = new CompletableFuture<>(); service.submit(() -> Server.launch(List.of(config), promise)); - List>> servers = promise.get(); + List servers = promise.get(); InetSocketAddress serverAddress = new InetSocketAddress(config.getHost(), config.getPort()); Channel channel = new Bootstrap().group(new NioEventLoopGroup()) .channel(NioDatagramChannel.class) @@ -89,7 +85,9 @@ void sendInvalidUDP() throws InterruptedException, ExecutionException { ChannelFuture future = channel.writeAndFlush(new DatagramPacket(Unpooled.wrappedBuffer(Dice.rollBytes(512)), serverAddress)).sync(); future.get(); Assertions.assertTrue(future.isDone()); - Server.close(servers); + for (Server.Instance server : servers) { + server.close(); + } } } @@ -97,7 +95,7 @@ void sendInvalidUDP() throws InterruptedException, ExecutionException { void sendInvalidTCP() throws InterruptedException, ExecutionException { ServerConfig config = ServerConfigTestCase.testConfig(0); try (ExecutorService service = Executors.newVirtualThreadPerTaskExecutor()) { - CompletableFuture>>> promise = new CompletableFuture<>(); + CompletableFuture> promise = new CompletableFuture<>(); Future server = service.submit(() -> Server.launch(List.of(config), promise)); promise.get(); Channel channel = new Bootstrap().group(new NioEventLoopGroup()) diff --git a/urban-spork-test/test/com/urbanspork/test/TCPTestCase.java b/urban-spork-test/test/com/urbanspork/test/TCPTestCase.java index 19fc40a..a90c163 100644 --- a/urban-spork-test/test/com/urbanspork/test/TCPTestCase.java +++ b/urban-spork-test/test/com/urbanspork/test/TCPTestCase.java @@ -12,8 +12,6 @@ import com.urbanspork.server.Server; import com.urbanspork.test.template.Parameter; import com.urbanspork.test.template.TCPTestTemplate; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.ServerSocketChannel; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -22,8 +20,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.concurrent.ExecutionException; @DisplayName("TCP") @@ -41,19 +37,19 @@ void testByParameter(Parameter parameter) throws ExecutionException, Interrupted serverConfig.setProtocol(protocol); serverConfig.setCipher(cipher); serverConfig.setPassword(parameter.serverPassword()); - List>> server = launchServer(config.getServers()); - Map.Entry client = launchClient(config); - handshakeAndSendBytes(client.getKey().localAddress()); - Server.close(server); - Client.close(client); + List server = launchServer(config.getServers()); + Client.Instance client = launchClient(config); + handshakeAndSendBytes(client.tcp().localAddress()); + closeServer(server); + client.close(); } @Test void testConnectServerFailed() throws ExecutionException, InterruptedException { ClientConfig config = ClientConfigTestCase.testConfig(0, TestDice.rollPort()); - Map.Entry client = launchClient(config); - Assertions.assertThrows(ExecutionException.class, () -> handshakeAndSendBytes(client.getKey().localAddress())); - Client.close(client); + Client.Instance client = launchClient(config); + Assertions.assertThrows(ExecutionException.class, () -> handshakeAndSendBytes(client.tcp().localAddress())); + client.close(); } void testShadowsocksAEAD2022EihByParameter(Parameter parameter) throws ExecutionException, InterruptedException { @@ -66,16 +62,16 @@ void testShadowsocksAEAD2022EihByParameter(Parameter parameter) throws Execution List user = new ArrayList<>(); user.add(new ServerUserConfig(TestDice.rollString(10), parameter.clientPassword())); serverConfig.setUser(user); - List>> server = launchServer(List.of(serverConfig)); + List server = launchServer(List.of(serverConfig)); ClientConfig config = ClientConfigTestCase.testConfig(0, serverConfig.getPort()); ServerConfig current = config.getCurrent(); current.setCipher(cipher); current.setProtocol(protocol); current.setPassword(parameter.serverPassword() + ":" + parameter.clientPassword()); - Map.Entry client = launchClient(config); - handshakeAndSendBytes(client.getKey().localAddress()); + Client.Instance client = launchClient(config); + handshakeAndSendBytes(client.udp().localAddress()); ServerUserManager.DEFAULT.clear(); - Server.close(server); - Client.close(client); + closeServer(server); + client.close(); } } diff --git a/urban-spork-test/test/com/urbanspork/test/UdpTestCase.java b/urban-spork-test/test/com/urbanspork/test/UdpTestCase.java index 11573f0..6bdeec7 100644 --- a/urban-spork-test/test/com/urbanspork/test/UdpTestCase.java +++ b/urban-spork-test/test/com/urbanspork/test/UdpTestCase.java @@ -13,8 +13,6 @@ import com.urbanspork.server.Server; import com.urbanspork.test.template.Parameter; import com.urbanspork.test.template.UdpTestTemplate; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.ServerSocketChannel; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; @@ -22,8 +20,6 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -44,12 +40,12 @@ void testByParameter(Parameter parameter) throws ExecutionException, Interrupted serverConfig.setProtocol(protocol); serverConfig.setCipher(cipher); serverConfig.setPassword(parameter.serverPassword()); - List>> server = launchServer(config.getServers()); - Map.Entry client = launchClient(config); - InetSocketAddress clientLocalAddress = client.getKey().localAddress(); + List server = launchServer(config.getServers()); + Client.Instance client = launchClient(config); + InetSocketAddress clientLocalAddress = client.tcp().localAddress(); handshakeAndSendBytes(clientLocalAddress); - Server.close(server); - Client.close(client); + closeServer(server); + client.close(); } void testShadowsocksAEAD2022EihByParameter(Parameter parameter) throws ExecutionException, InterruptedException, TimeoutException { @@ -64,18 +60,18 @@ void testShadowsocksAEAD2022EihByParameter(Parameter parameter) throws Execution List user = new ArrayList<>(); user.add(new ServerUserConfig(TestDice.rollString(10), parameter.clientPassword())); serverConfig.setUser(user); - List>> server = launchServer(List.of(serverConfig)); + List server = launchServer(List.of(serverConfig)); ClientConfig clientConfig = ClientConfigTestCase.testConfig(0, serverConfig.getPort()); ServerConfig current = clientConfig.getCurrent(); current.setCipher(cipher); current.setNetworks(networks); current.setProtocol(protocol); current.setPassword(parameter.serverPassword() + ":" + parameter.clientPassword()); - Map.Entry client = launchClient(clientConfig); - InetSocketAddress clientLocalAddress = client.getKey().localAddress(); + Client.Instance client = launchClient(clientConfig); + InetSocketAddress clientLocalAddress = client.tcp().localAddress(); handshakeAndSendBytes(clientLocalAddress); ServerUserManager.DEFAULT.clear(); - Server.close(server); - Client.close(client); + closeServer(server); + client.close(); } } diff --git a/urban-spork-test/test/com/urbanspork/test/template/TestTemplate.java b/urban-spork-test/test/com/urbanspork/test/template/TestTemplate.java index 5bb2c81..5ec7e19 100644 --- a/urban-spork-test/test/com/urbanspork/test/template/TestTemplate.java +++ b/urban-spork-test/test/com/urbanspork/test/template/TestTemplate.java @@ -4,32 +4,34 @@ import com.urbanspork.common.config.ClientConfig; import com.urbanspork.common.config.ServerConfig; import com.urbanspork.server.Server; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.ServerSocketChannel; import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -public class TestTemplate { +abstract class TestTemplate { protected static final ExecutorService POOL = Executors.newVirtualThreadPerTaskExecutor(); - protected static Map.Entry launchClient(ClientConfig config) + protected static Client.Instance launchClient(ClientConfig config) throws InterruptedException, ExecutionException { - CompletableFuture> promise = new CompletableFuture<>(); + CompletableFuture promise = new CompletableFuture<>(); POOL.submit(() -> Client.launch(config, promise)); return promise.get(); } - protected static List>> launchServer(List configs) + protected static List launchServer(List configs) throws InterruptedException, ExecutionException { - CompletableFuture>>> promise = new CompletableFuture<>(); + CompletableFuture> promise = new CompletableFuture<>(); POOL.submit(() -> Server.launch(configs, promise)); return promise.get(); } + + protected static void closeServer(List servers) { + for (Server.Instance server : servers) { + server.close(); + } + } } diff --git a/urban-spork-test/test/com/urbanspork/test/template/UdpTestTemplate.java b/urban-spork-test/test/com/urbanspork/test/template/UdpTestTemplate.java index 6fca9e2..c53c19a 100644 --- a/urban-spork-test/test/com/urbanspork/test/template/UdpTestTemplate.java +++ b/urban-spork-test/test/com/urbanspork/test/template/UdpTestTemplate.java @@ -94,6 +94,28 @@ private void launchUDPTestServer() { } } + private void initChannel() { + channel = new Bootstrap().group(group) + .channel(NioDatagramChannel.class) + .handler(new ChannelInitializer<>() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast( + new DatagramPacketEncoder(), + new DatagramPacketDecoder(), + new SimpleChannelInboundHandler(false) { + @Override + protected void channelRead0(ChannelHandlerContext ctx, TernaryDatagramPacket msg) { + logger.info("Receive msg {}", msg); + consumer.accept(msg); + } + } + ); + } + }) + .bind(0).syncUninterruptibly().channel(); + } + protected void handshakeAndSendBytes(InetSocketAddress proxyAddress) throws InterruptedException, ExecutionException, TimeoutException { for (InetSocketAddress address : dstAddress) { handshakeAndSendBytes(proxyAddress, address); @@ -129,26 +151,4 @@ void shutdown() { channel.close(); group.shutdownGracefully(); } - - private void initChannel() { - channel = new Bootstrap().group(group) - .channel(NioDatagramChannel.class) - .handler(new ChannelInitializer<>() { - @Override - protected void initChannel(Channel ch) { - ch.pipeline().addLast( - new DatagramPacketEncoder(), - new DatagramPacketDecoder(), - new SimpleChannelInboundHandler(false) { - @Override - protected void channelRead0(ChannelHandlerContext ctx, TernaryDatagramPacket msg) { - logger.info("Receive msg {}", msg); - consumer.accept(msg); - } - } - ); - } - }) - .bind(0).syncUninterruptibly().channel(); - } }