Skip to content

Commit

Permalink
refactor: add a record class instead of Map.Entry
Browse files Browse the repository at this point in the history
  • Loading branch information
Zmax0 committed Feb 5, 2024
1 parent d122237 commit 9728268
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@
import com.urbanspork.client.gui.Resource;
import com.urbanspork.common.config.ClientConfig;
import com.urbanspork.common.config.ServerConfig;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;

import java.awt.TrayIcon.MessageType;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -20,7 +17,7 @@ public class Proxy {

private static final ExecutorService executor = Executors.newSingleThreadExecutor();

private static Map.Entry<ServerSocketChannel, DatagramChannel> client;
private static Client.Instance client;

private Proxy() {}

Expand All @@ -31,9 +28,9 @@ public static void launch() {
return;
}
if (client != null) {
Client.close(client);
client.close();
}
CompletableFuture<Map.Entry<ServerSocketChannel, DatagramChannel>> promise = new CompletableFuture<>();
CompletableFuture<Client.Instance> promise = new CompletableFuture<>();
executor.submit(() -> Client.launch(config, promise));
try {
client = promise.get();
Expand All @@ -50,7 +47,7 @@ public static void launch() {
}

public static void exit() {
Client.close(client);
client.close();
executor.shutdown();
}
}
23 changes: 12 additions & 11 deletions urban-spork-client/src/com/urbanspork/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public class Client {
Expand All @@ -37,7 +36,7 @@ public static void main(String[] args) {
launch(ConfigHandler.DEFAULT.read(), new CompletableFuture<>());
}

public static void launch(ClientConfig config, CompletableFuture<Map.Entry<ServerSocketChannel, DatagramChannel>> promise) {
public static void launch(ClientConfig config, CompletableFuture<Instance> promise) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Expand All @@ -54,13 +53,13 @@ public static void launch(ClientConfig config, CompletableFuture<Map.Entry<Serve
config.setPort(localPort);
DatagramChannel udp = launchUdp(bossGroup, workerGroup, config);
logger.info("Launch client => tcp{} udp{} ", tcpLocalAddress, udp.localAddress());
Map.Entry<ServerSocketChannel, DatagramChannel> client = Map.entry(tcp, udp);
Instance client = new Instance(tcp, udp);
promise.complete(client);
});
Map.Entry<ServerSocketChannel, DatagramChannel> client = promise.get();
Instance client = promise.get();
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> client.getKey().closeFuture().syncUninterruptibly()),
CompletableFuture.supplyAsync(() -> client.getValue().closeFuture().syncUninterruptibly())
CompletableFuture.supplyAsync(() -> client.tcp().closeFuture().syncUninterruptibly()),
CompletableFuture.supplyAsync(() -> client.udp().closeFuture().syncUninterruptibly())
).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -73,11 +72,6 @@ public static void launch(ClientConfig config, CompletableFuture<Map.Entry<Serve
}
}

public static void close(Map.Entry<ServerSocketChannel, DatagramChannel> client) {
client.getKey().close().awaitUninterruptibly();
client.getValue().close().awaitUninterruptibly();
}

private static DatagramChannel launchUdp(EventLoopGroup bossGroup, EventLoopGroup workerGroup, ClientConfig config) throws InterruptedException {
ServerConfig current = config.getCurrent();
ChannelHandler udpTransportHandler;
Expand All @@ -99,4 +93,11 @@ protected void initChannel(Channel ch) {
})
.bind(InetAddress.getLoopbackAddress(), config.getPort()).sync().channel();
}

public record Instance(ServerSocketChannel tcp, DatagramChannel udp) {
public void close() {
tcp.close().awaitUninterruptibly();
udp.close().awaitUninterruptibly();
}
}
}
33 changes: 16 additions & 17 deletions urban-spork-server/src/com/urbanspork/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand All @@ -49,21 +48,21 @@ public static void launch(List<ServerConfig> configs) {
launch(configs, new CompletableFuture<>());
}

public static void launch(List<ServerConfig> configs, CompletableFuture<List<Map.Entry<ServerSocketChannel, Optional<DatagramChannel>>>> promise) {
public static void launch(List<ServerConfig> configs, CompletableFuture<List<Instance>> promise) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
List<Map.Entry<ServerSocketChannel, Optional<DatagramChannel>>> servers = new ArrayList<>(configs.size());
List<Instance> servers = new ArrayList<>(configs.size());
int count = 0;
for (ServerConfig config : configs) {
Map.Entry<ServerSocketChannel, Optional<DatagramChannel>> server = startup(bossGroup, workerGroup, config);
count += server.getValue().isPresent() ? 2 : 1;
Instance server = startup(bossGroup, workerGroup, config);
count += server.udp().isPresent() ? 2 : 1;
servers.add(server);
}
CountDownLatch latch = new CountDownLatch(count);
for (Map.Entry<ServerSocketChannel, Optional<DatagramChannel>> server : servers) {
server.getKey().closeFuture().addListener(future -> latch.countDown());
server.getValue().ifPresent(v -> v.closeFuture().addListener(future -> latch.countDown()));
for (Instance server : servers) {
server.tcp().closeFuture().addListener(future -> latch.countDown());
server.udp().ifPresent(v -> v.closeFuture().addListener(future -> latch.countDown()));
}
promise.complete(servers);
latch.await(); // main thread is waiting here
Expand All @@ -80,14 +79,7 @@ public static void launch(List<ServerConfig> configs, CompletableFuture<List<Map
}
}

public static void close(List<Map.Entry<ServerSocketChannel, Optional<DatagramChannel>>> servers) {
for (Map.Entry<ServerSocketChannel, Optional<DatagramChannel>> entry : servers) {
entry.getKey().close().awaitUninterruptibly();
entry.getValue().ifPresent(c -> c.close().awaitUninterruptibly());
}
}

private static Map.Entry<ServerSocketChannel, Optional<DatagramChannel>> startup(EventLoopGroup bossGroup, EventLoopGroup workerGroup, ServerConfig config)
private static Instance startup(EventLoopGroup bossGroup, EventLoopGroup workerGroup, ServerConfig config)
throws InterruptedException {
if (Protocols.shadowsocks == config.getProtocol()) {
List<ServerUserConfig> user = config.getUser();
Expand All @@ -101,7 +93,7 @@ private static Map.Entry<ServerSocketChannel, Optional<DatagramChannel>> startup
.bind(config.getPort()).sync().addListener(future -> logger.info("Startup tcp server => {}", config)).channel();
config.setPort(tcp.localAddress().getPort());
Optional<DatagramChannel> udp = startupUdp(bossGroup, workerGroup, config);
return Map.entry(tcp, udp);
return new Instance(tcp, udp);
}

private static Optional<DatagramChannel> startupUdp(EventLoopGroup bossGroup, EventLoopGroup workerGroup, ServerConfig config) throws InterruptedException {
Expand All @@ -124,4 +116,11 @@ protected void initChannel(Channel ch) {
return Optional.empty();
}
}

public record Instance(ServerSocketChannel tcp, Optional<DatagramChannel> udp) {
public void close() {
tcp.close().awaitUninterruptibly();
udp.ifPresent(c -> c.close().awaitUninterruptibly());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.urbanspork.test.TestDice;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.handler.codec.socksx.v5.Socks5CommandType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
Expand All @@ -17,7 +15,6 @@

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

Expand All @@ -30,21 +27,21 @@ class ClientSocksHandshakeTestCase {
void testUdpEnable() throws InterruptedException, ExecutionException {
ClientConfig config = ClientConfigTestCase.testConfig(0, 0);
config.getServers().getFirst().setProtocol(Protocols.vmess);
Map.Entry<ServerSocketChannel, DatagramChannel> client = ClientTestCase.asyncLaunchClient(config);
Client.Instance client = ClientTestCase.asyncLaunchClient(config);
InetSocketAddress proxyAddress = new InetSocketAddress(config.getPort());
InetSocketAddress dstAddress1 = new InetSocketAddress(InetAddress.getLoopbackAddress(), TestDice.rollPort());
assertFailedHandshake(proxyAddress, dstAddress1);
Client.close(client);
client.close();
}

@Test
void testIllegalDstAddress() throws InterruptedException, ExecutionException {
ClientConfig config = ClientConfigTestCase.testConfig(0, 0);
Map.Entry<ServerSocketChannel, DatagramChannel> client = ClientTestCase.asyncLaunchClient(config);
Client.Instance client = ClientTestCase.asyncLaunchClient(config);
InetSocketAddress proxyAddress = new InetSocketAddress(config.getPort());
InetSocketAddress dstAddress1 = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
assertFailedHandshake(proxyAddress, dstAddress1);
Client.close(client);
client.close();
}


Expand Down
13 changes: 5 additions & 8 deletions urban-spork-test/test/com/urbanspork/client/ClientTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import com.urbanspork.common.protocol.socks.ClientHandshake;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.handler.codec.socksx.v5.Socks5CommandType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
Expand All @@ -24,7 +22,6 @@

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -64,10 +61,10 @@ void testExit() {
@Test
@Order(2)
void testLaunchFailed() throws InterruptedException, ExecutionException {
ServerSocketChannel c1 = asyncLaunchClient(ClientConfigTestCase.testConfig(0, 0)).getKey();
ClientConfig config = ClientConfigTestCase.testConfig(c1.localAddress().getPort(), 0);
Client.Instance client = asyncLaunchClient(ClientConfigTestCase.testConfig(0, 0));
ClientConfig config = ClientConfigTestCase.testConfig(client.tcp().localAddress().getPort(), 0);
Assertions.assertThrows(ExecutionException.class, () -> asyncLaunchClient(config));
c1.close();
client.close();
}

@ParameterizedTest
Expand All @@ -79,8 +76,8 @@ void testHandshake(Socks5CommandType type) {
Assertions.assertThrows(ExecutionException.class, () -> ClientHandshake.noAuth(group, type, proxyAddress, dstAddress).get(10, TimeUnit.SECONDS));
}

public static Map.Entry<ServerSocketChannel, DatagramChannel> asyncLaunchClient(ClientConfig config) throws InterruptedException, ExecutionException {
CompletableFuture<Map.Entry<ServerSocketChannel, DatagramChannel>> promise = new CompletableFuture<>();
public static Client.Instance asyncLaunchClient(ClientConfig config) throws InterruptedException, ExecutionException {
CompletableFuture<Client.Instance> promise = new CompletableFuture<>();
SERVICE.submit(() -> Client.launch(config, promise));
return promise.get();
}
Expand Down
16 changes: 7 additions & 9 deletions urban-spork-test/test/com/urbanspork/server/ServerTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
Expand All @@ -24,8 +22,6 @@
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -52,7 +48,7 @@ void launchRejected() {
void launchFailed() {
int port = TestDice.rollPort();
List<ServerConfig> configs = ServerConfigTestCase.testConfigs(port, port);
CompletableFuture<List<Map.Entry<ServerSocketChannel, Optional<DatagramChannel>>>> promise = new CompletableFuture<>();
CompletableFuture<List<Server.Instance>> promise = new CompletableFuture<>();
Server.launch(configs, promise);
Assertions.assertTrue(promise.isCompletedExceptionally());
}
Expand All @@ -78,9 +74,9 @@ void sendInvalidUDP() throws InterruptedException, ExecutionException {
ServerConfig config = ServerConfigTestCase.testConfig(0);
config.setNetworks(new Network[]{Network.TCP, Network.UDP});
try (ExecutorService service = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<List<Map.Entry<ServerSocketChannel, Optional<DatagramChannel>>>> promise = new CompletableFuture<>();
CompletableFuture<List<Server.Instance>> promise = new CompletableFuture<>();
service.submit(() -> Server.launch(List.of(config), promise));
List<Map.Entry<ServerSocketChannel, Optional<DatagramChannel>>> servers = promise.get();
List<Server.Instance> servers = promise.get();
InetSocketAddress serverAddress = new InetSocketAddress(config.getHost(), config.getPort());
Channel channel = new Bootstrap().group(new NioEventLoopGroup())
.channel(NioDatagramChannel.class)
Expand All @@ -89,15 +85,17 @@ void sendInvalidUDP() throws InterruptedException, ExecutionException {
ChannelFuture future = channel.writeAndFlush(new DatagramPacket(Unpooled.wrappedBuffer(Dice.rollBytes(512)), serverAddress)).sync();
future.get();
Assertions.assertTrue(future.isDone());
Server.close(servers);
for (Server.Instance server : servers) {
server.close();
}
}
}

@Test
void sendInvalidTCP() throws InterruptedException, ExecutionException {
ServerConfig config = ServerConfigTestCase.testConfig(0);
try (ExecutorService service = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<List<Map.Entry<ServerSocketChannel, Optional<DatagramChannel>>>> promise = new CompletableFuture<>();
CompletableFuture<List<Server.Instance>> promise = new CompletableFuture<>();
Future<?> server = service.submit(() -> Server.launch(List.of(config), promise));
promise.get();
Channel channel = new Bootstrap().group(new NioEventLoopGroup())
Expand Down
30 changes: 13 additions & 17 deletions urban-spork-test/test/com/urbanspork/test/TCPTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import com.urbanspork.server.Server;
import com.urbanspork.test.template.Parameter;
import com.urbanspork.test.template.TCPTestTemplate;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
Expand All @@ -22,8 +20,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

@DisplayName("TCP")
Expand All @@ -41,19 +37,19 @@ void testByParameter(Parameter parameter) throws ExecutionException, Interrupted
serverConfig.setProtocol(protocol);
serverConfig.setCipher(cipher);
serverConfig.setPassword(parameter.serverPassword());
List<Map.Entry<ServerSocketChannel, Optional<DatagramChannel>>> server = launchServer(config.getServers());
Map.Entry<ServerSocketChannel, DatagramChannel> client = launchClient(config);
handshakeAndSendBytes(client.getKey().localAddress());
Server.close(server);
Client.close(client);
List<Server.Instance> server = launchServer(config.getServers());
Client.Instance client = launchClient(config);
handshakeAndSendBytes(client.tcp().localAddress());
closeServer(server);
client.close();
}

@Test
void testConnectServerFailed() throws ExecutionException, InterruptedException {
ClientConfig config = ClientConfigTestCase.testConfig(0, TestDice.rollPort());
Map.Entry<ServerSocketChannel, DatagramChannel> client = launchClient(config);
Assertions.assertThrows(ExecutionException.class, () -> handshakeAndSendBytes(client.getKey().localAddress()));
Client.close(client);
Client.Instance client = launchClient(config);
Assertions.assertThrows(ExecutionException.class, () -> handshakeAndSendBytes(client.tcp().localAddress()));
client.close();
}

void testShadowsocksAEAD2022EihByParameter(Parameter parameter) throws ExecutionException, InterruptedException {
Expand All @@ -66,16 +62,16 @@ void testShadowsocksAEAD2022EihByParameter(Parameter parameter) throws Execution
List<ServerUserConfig> user = new ArrayList<>();
user.add(new ServerUserConfig(TestDice.rollString(10), parameter.clientPassword()));
serverConfig.setUser(user);
List<Map.Entry<ServerSocketChannel, Optional<DatagramChannel>>> server = launchServer(List.of(serverConfig));
List<Server.Instance> server = launchServer(List.of(serverConfig));
ClientConfig config = ClientConfigTestCase.testConfig(0, serverConfig.getPort());
ServerConfig current = config.getCurrent();
current.setCipher(cipher);
current.setProtocol(protocol);
current.setPassword(parameter.serverPassword() + ":" + parameter.clientPassword());
Map.Entry<ServerSocketChannel, DatagramChannel> client = launchClient(config);
handshakeAndSendBytes(client.getKey().localAddress());
Client.Instance client = launchClient(config);
handshakeAndSendBytes(client.udp().localAddress());
ServerUserManager.DEFAULT.clear();
Server.close(server);
Client.close(client);
closeServer(server);
client.close();
}
}
Loading

0 comments on commit 9728268

Please sign in to comment.