Skip to content

Commit

Permalink
feat(ss): assign a unique ServerUserManager to each server instance
Browse files Browse the repository at this point in the history
  • Loading branch information
Zmax0 committed Aug 14, 2024
1 parent 0c651dd commit fb03c60
Show file tree
Hide file tree
Showing 22 changed files with 111 additions and 80 deletions.
2 changes: 1 addition & 1 deletion urban-spork-client-gui/resource/startup.cmd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@echo off
set server="urban-spork-client-gui"
start javaw --module-path lib --add-modules javafx.controls --add-modules com.jfoenix --add-opens java.base/java.lang.reflect=com.jfoenix --add-opens javafx.controls/com.sun.javafx.scene.control.behavior=com.jfoenix --add-opens javafx.controls/com.sun.javafx.scene.control=com.jfoenix -jar %server%.jar
start javaw --module-path lib --add-modules javafx.controls --add-modules com.jfoenix --add-opens java.base/java.lang.reflect=com.jfoenix --add-opens javafx.controls/com.sun.javafx.scene.control.behavior=com.jfoenix --add-opens javafx.controls/com.sun.javafx.scene.control=com.jfoenix -Xms64m -Xmx256m -Dio.netty.maxDirectMemory=0 -jar %server%.jar
exit
2 changes: 1 addition & 1 deletion urban-spork-client-gui/resource/startup.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/bash
server="urban-spork-client-gui"
"$JAVA_HOME"/bin/java --module-path lib --add-modules javafx.controls --add-modules com.jfoenix --add-opens java.base/java.lang.reflect=com.jfoenix --add-opens javafx.controls/com.sun.javafx.scene.control.behavior=com.jfoenix --add-opens javafx.controls/com.sun.javafx.scene.control=com.jfoenix -jar ${server}.jar
"$JAVA_HOME"/bin/java --module-path lib --add-modules javafx.controls --add-modules com.jfoenix --add-opens java.base/java.lang.reflect=com.jfoenix --add-opens javafx.controls/com.sun.javafx.scene.control.behavior=com.jfoenix --add-opens javafx.controls/com.sun.javafx.scene.control=com.jfoenix -Xms64m -Xmx256m -Dio.netty.maxDirectMemory=0 -jar ${server}.jar
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,16 @@
import com.urbanspork.common.config.ServerConfig;
import com.urbanspork.common.config.SslSetting;
import com.urbanspork.common.config.WebSocketSetting;
import com.urbanspork.common.manage.shadowsocks.ServerUserManager;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
Expand Down Expand Up @@ -95,7 +102,7 @@ protected void initChannel(Channel outbound) throws Exception {
buildSslHandler(outbound, config),
new ClientHeaderEncoder(config.getPassword(), address, SocksCmdType.CONNECT.byteValue())
);
default -> outbound.pipeline().addLast(new TcpRelayCodec(new Context(), config, address, Mode.Client));
default -> outbound.pipeline().addLast(new TcpRelayCodec(new Context(), config, address, Mode.Client, ServerUserManager.empty()));
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.urbanspork.common.codec.shadowsocks.Mode;
import com.urbanspork.common.codec.shadowsocks.udp.UdpRelayCodec;
import com.urbanspork.common.config.ServerConfig;
import com.urbanspork.common.manage.shadowsocks.ServerUserManager;
import com.urbanspork.common.transport.udp.DatagramPacketWrapper;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
Expand All @@ -31,7 +32,7 @@ public ClientUdpRelayHandler(ServerConfig config, EventLoopGroup workerGroup) {
super(config, Duration.ofMinutes(10));
this.workerGroup = workerGroup;
this.relay = new InetSocketAddress(config.getHost(), config.getPort());
this.codec = new UdpRelayCodec(config, Mode.Client);
this.codec = new UdpRelayCodec(config, Mode.Client, ServerUserManager.empty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ public class TcpRelayCodec extends ByteToMessageCodec<ByteBuf> {
private final Session session;
private final AeadCipherCodec cipher;

public TcpRelayCodec(Context context, ServerConfig config, Mode mode) {
this(context, config, null, mode);
public TcpRelayCodec(Context context, ServerConfig config, Mode mode, ServerUserManager userManager) {
this(context, config, null, mode, userManager);
}

public TcpRelayCodec(Context context, ServerConfig config, InetSocketAddress request, Mode mode) {
ServerUserManager userManager = Mode.Server == mode ? ServerUserManager.DEFAULT : ServerUserManager.EMPTY;
public TcpRelayCodec(Context context, ServerConfig config, InetSocketAddress request, Mode mode, ServerUserManager userManager) {
Identity identity = new Identity(config.getCipher());
this.session = new Session(mode, identity, request, userManager, context);
this.cipher = AeadCipherCodecs.get(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@
public class UdpRelayCodec extends MessageToMessageCodec<DatagramPacket, DatagramPacketWrapper> {
private static final Logger logger = LoggerFactory.getLogger(UdpRelayCodec.class);
private final ServerConfig config;
private final AeadCipherCodec cipher;
private final Mode mode;
private final LruCache<InetSocketAddress, Control> controlMap;
private final ServerUserManager userManager;
private final AeadCipherCodec cipher;
private final LruCache<InetSocketAddress, Control> controlMap;
private final Control control0;

public UdpRelayCodec(ServerConfig config, Mode mode) {
public UdpRelayCodec(ServerConfig config, Mode mode, ServerUserManager userManager) {
this.config = config;
this.cipher = AeadCipherCodecs.get(config);
this.mode = mode;
this.userManager = userManager;
this.cipher = AeadCipherCodecs.get(config);
this.controlMap = new LruCache<>(1024, Duration.ofMinutes(5), (k, v) -> logger.info("[udp]control map expire {}={}", k, v));
this.userManager = Mode.Server == mode ? ServerUserManager.DEFAULT : ServerUserManager.EMPTY;
this.control0 = new Control(config.getCipher());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
package com.urbanspork.common.manage.shadowsocks;

import com.urbanspork.common.config.ServerConfig;
import com.urbanspork.common.config.ServerUserConfig;
import com.urbanspork.common.util.ByteString;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public enum ServerUserManager {

DEFAULT(new ConcurrentHashMap<>()),
EMPTY(Collections.emptyMap());
public class ServerUserManager {

private final Map<BytesKey, ServerUser> users;

ServerUserManager(Map<BytesKey, ServerUser> users) {
public static ServerUserManager from(ServerConfig config) {
ServerUserManager manager = new ServerUserManager(new ConcurrentHashMap<>());
List<ServerUserConfig> user = config.getUser();
if (user != null) {
user.stream().map(ServerUser::from).forEach(manager::addUser);
}
return manager;
}

public static ServerUserManager empty() {
return new ServerUserManager(Collections.emptyMap());
}

private ServerUserManager(Map<BytesKey, ServerUser> users) {
this.users = users;
}

Expand Down Expand Up @@ -58,5 +73,10 @@ public boolean equals(Object o) {
public int hashCode() {
return Arrays.hashCode(bytes);
}

@Override
public String toString() {
return ByteString.valueOf(bytes);
}
}
}
39 changes: 16 additions & 23 deletions urban-spork-server/src/com/urbanspork/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
import com.urbanspork.common.codec.shadowsocks.udp.UdpRelayCodec;
import com.urbanspork.common.config.ConfigHandler;
import com.urbanspork.common.config.ServerConfig;
import com.urbanspork.common.config.ServerUserConfig;
import com.urbanspork.common.manage.shadowsocks.ServerUser;
import com.urbanspork.common.manage.shadowsocks.ServerUserManager;
import com.urbanspork.common.protocol.Protocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
Expand Down Expand Up @@ -48,13 +48,14 @@ public static void launch(List<ServerConfig> configs) {
}

public static void launch(List<ServerConfig> configs, CompletableFuture<List<Instance>> promise) {
Context context = Context.newCheckReplayInstance();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
List<Instance> servers = new ArrayList<>(configs.size());
int count = 0;
for (ServerConfig config : configs) {
Instance server = startup(bossGroup, workerGroup, config);
Instance server = startup(bossGroup, workerGroup, new ServerInitializationContext(config, context));
count += server.udp().isPresent() ? 2 : 1;
servers.add(server);
}
Expand All @@ -73,45 +74,37 @@ public static void launch(List<ServerConfig> configs, CompletableFuture<List<Ins
logger.error("Startup server failed", e);
promise.completeExceptionally(e);
} finally {
context.release();
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

private static Instance startup(EventLoopGroup bossGroup, EventLoopGroup workerGroup, ServerConfig config)
private static Instance startup(EventLoopGroup bossGroup, EventLoopGroup workerGroup, ServerInitializationContext context)
throws InterruptedException {
if (Protocol.shadowsocks == config.getProtocol()) {
List<ServerUserConfig> user = config.getUser();
if (user != null) {
user.stream().map(ServerUser::from).forEach(ServerUserManager.DEFAULT::addUser);
}
}
Context context = Context.newCheckReplayInstance();
ServerConfig config = context.config();
ServerSocketChannel tcp = (ServerSocketChannel) new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerInitializer(config, context))
.bind(config.getPort()).addListener(future -> {
if (!future.isSuccess()) {
context.release();
}
})
.childHandler(new ServerInitializer(context))
.bind(config.getPort())
.sync().addListener(future -> logger.info("Startup tcp server => {}", config))
.channel().closeFuture().addListener(future -> context.release()).channel();
.channel().closeFuture().channel();
config.setPort(tcp.localAddress().getPort());
Optional<DatagramChannel> udp = startupUdp(bossGroup, workerGroup, config);
Optional<DatagramChannel> udp = startupUdp(bossGroup, workerGroup, context);
return new Instance(tcp, udp);
}

private static Optional<DatagramChannel> startupUdp(EventLoopGroup bossGroup, EventLoopGroup workerGroup, ServerConfig config) throws InterruptedException {
private static Optional<DatagramChannel> startupUdp(EventLoopGroup bossGroup, EventLoopGroup workerGroup, ServerInitializationContext context) throws InterruptedException {
ServerConfig config = context.config();
if (Protocol.shadowsocks == config.getProtocol() && config.udpEnabled()) {
Channel channel = new Bootstrap().group(bossGroup).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(
new UdpRelayCodec(config, Mode.Server),
new UdpRelayCodec(config, Mode.Server, context.userManager()),
new ServerUdpRelayHandler(config.getPacketEncoding(), workerGroup),
new ExceptionHandler(config)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.urbanspork.server;

import com.urbanspork.common.codec.shadowsocks.tcp.Context;
import com.urbanspork.common.config.ServerConfig;
import com.urbanspork.common.manage.shadowsocks.ServerUserManager;

public record ServerInitializationContext(ServerConfig config, Context context, ServerUserManager userManager) {
public ServerInitializationContext(ServerConfig config, Context context) {
this(config, context, ServerUserManager.from(config));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.urbanspork.common.channel.ExceptionHandler;
import com.urbanspork.common.codec.shadowsocks.Mode;
import com.urbanspork.common.codec.shadowsocks.tcp.Context;
import com.urbanspork.common.codec.shadowsocks.tcp.TcpRelayCodec;
import com.urbanspork.common.config.ServerConfig;
import com.urbanspork.common.config.SslSetting;
Expand All @@ -28,16 +27,15 @@

public class ServerInitializer extends ChannelInitializer<Channel> {

private final ServerConfig config;
private final Context context;
private final ServerInitializationContext context;

public ServerInitializer(ServerConfig config, Context context) {
this.config = config;
public ServerInitializer(ServerInitializationContext context) {
this.context = context;
}

@Override
protected void initChannel(Channel c) throws SSLException {
ServerConfig config = context.config();
if (config.wsEnabled()) {
enableWebSocket(c);
}
Expand All @@ -53,12 +51,12 @@ protected void initChannel(Channel c) throws SSLException {
SslHandler sslHandler = sslContext.newHandler(c.alloc(), serverName, config.getPort());
c.pipeline().addLast(sslHandler, new ServerHeaderDecoder(config), new ExceptionHandler(config));
}
default -> c.pipeline().addLast(new TcpRelayCodec(context, config, Mode.Server), new ExceptionHandler(config), new ServerRelayHandler(config));
default -> c.pipeline().addLast(new TcpRelayCodec(context.context(), config, Mode.Server, context.userManager()), new ExceptionHandler(config), new ServerRelayHandler(config));
}
}

private void enableWebSocket(Channel channel) {
String path = Optional.ofNullable(config.getWs()).map(WebSocketSetting::getPath).orElseThrow(() -> new IllegalArgumentException("required path not present"));
String path = Optional.ofNullable(context.config().getWs()).map(WebSocketSetting::getPath).orElseThrow(() -> new IllegalArgumentException("required path not present"));
channel.pipeline().addLast(
new HttpServerCodec(),
new WebSocketServerProtocolHandler(path, null, true, 0xfffff),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.urbanspork.common.codec.shadowsocks.tcp.TcpRelayCodec;
import com.urbanspork.common.codec.shadowsocks.udp.UdpRelayCodec;
import com.urbanspork.common.config.ServerConfig;
import com.urbanspork.common.manage.shadowsocks.ServerUserManager;
import com.urbanspork.common.protocol.Protocol;
import com.urbanspork.common.transport.tcp.RelayingPayload;
import com.urbanspork.common.transport.udp.DatagramPacketWrapper;
Expand Down Expand Up @@ -34,9 +35,9 @@ void testTcpRelayChannel() {
config.setCipher(cipher);
config.setPassword(password);
EmbeddedChannel client = new EmbeddedChannel();
client.pipeline().addLast(new TcpRelayCodec(new Context(), config, request, Mode.Client));
client.pipeline().addLast(new TcpRelayCodec(new Context(), config, request, Mode.Client, ServerUserManager.empty()));
EmbeddedChannel server = new EmbeddedChannel();
server.pipeline().addLast(new TcpRelayCodec(new Context(), config, Mode.Server));
server.pipeline().addLast(new TcpRelayCodec(new Context(), config, Mode.Server, ServerUserManager.from(config)));
String message = TestDice.rollString();
client.writeOutbound(Unpooled.wrappedBuffer(message.getBytes()));
ByteBuf msg = client.readOutbound();
Expand Down Expand Up @@ -67,8 +68,8 @@ void testUdpRelayChannel() {
ServerConfig config = new ServerConfig();
config.setPassword(TestDice.rollPassword(Protocol.shadowsocks, cipher));
config.setCipher(cipher);
client.pipeline().addLast(new UdpRelayCodec(config, Mode.Client));
server.pipeline().addLast(new UdpRelayCodec(config, Mode.Server));
client.pipeline().addLast(new UdpRelayCodec(config, Mode.Client, ServerUserManager.empty()));
server.pipeline().addLast(new UdpRelayCodec(config, Mode.Server, ServerUserManager.from(config)));
String host = "192.168.255.1";
String message = TestDice.rollString();
InetSocketAddress dst = new InetSocketAddress(host, port);
Expand All @@ -93,8 +94,8 @@ void testAead2022UdpAntiReplay() {
ServerConfig config = new ServerConfig();
config.setPassword(TestDice.rollPassword(Protocol.shadowsocks, kind));
config.setCipher(kind);
client.pipeline().addLast(new UdpRelayCodec(config, Mode.Client));
server.pipeline().addLast(new UdpRelayCodec(config, Mode.Server));
client.pipeline().addLast(new UdpRelayCodec(config, Mode.Client, ServerUserManager.empty()));
server.pipeline().addLast(new UdpRelayCodec(config, Mode.Server, ServerUserManager.from(config)));
InetSocketAddress sender = new InetSocketAddress(InetAddress.getLoopbackAddress(), 16801);
InetSocketAddress recipient = new InetSocketAddress(InetAddress.getLoopbackAddress(), 16802);
InetSocketAddress porxy = new InetSocketAddress(InetAddress.getLoopbackAddress(), 16803);
Expand Down
Loading

0 comments on commit fb03c60

Please sign in to comment.