Skip to content

Commit

Permalink
feat(ss): enhance the anti-detection capability
Browse files Browse the repository at this point in the history
  • Loading branch information
Zmax0 committed Jul 19, 2024
1 parent 25732df commit 5a148ca
Show file tree
Hide file tree
Showing 12 changed files with 291 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,15 @@ private void initAEAD2022PayloadDecoder(Session session, ByteBuf in, List<Object
}
if (session.context().checkNonceReplay(salt)) {
String msg = String.format("detected repeated nonce salt %s", ByteString.valueOf(salt));
throw new DecoderException(msg);
throw new RepeatedNonceException(msg);
}
session.identity().setRequestSalt(salt);
ByteBuf sealedHeaderBuf = Unpooled.buffer();
int sealedHeaderLength = eihLength + 1 + 8 + requestSaltLength + 2 + tagSize;
if (in.readableBytes() < sealedHeaderLength) {
String msg = String.format("header too short, expecting %d bytes, but found %d bytes", sealedHeaderLength, in.readableBytes());
throw new TooShortHeaderException(msg);
}
in.getBytes(in.readerIndex(), sealedHeaderBuf, sealedHeaderLength);
PayloadDecoder newPayloadDecoder;
if (requireEih) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.urbanspork.common.codec.shadowsocks.tcp;

import io.netty.handler.codec.DecoderException;

class RepeatedNonceException extends DecoderException {
RepeatedNonceException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.ByteToMessageCodec;
import io.netty.handler.codec.DecoderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -44,15 +43,28 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (Mode.Server == session.mode() && cause instanceof DecoderException) {
SocketChannel channel = (SocketChannel) ctx.channel();
String transLog = ExceptionHandler.transLog(channel);
logger.error("[tcp][{}] {}", transLog, cause.getMessage());
ctx.deregister();
channel.config().setSoLinger(0);
channel.shutdownOutput().addListener(future -> channel.unsafe().beginRead());
} else {
ctx.fireExceptionCaught(cause);
switch (cause) {
case TooShortHeaderException ignore -> {
logError(ctx, cause);
ctx.deregister();
SocketChannel channel = (SocketChannel) ctx.channel();
if (channel.isActive()) {
channel.config().setSoLinger(0);
channel.shutdownOutput();
}
}
case RepeatedNonceException ignore -> {
logError(ctx, cause);
SocketChannel channel = (SocketChannel) ctx.channel();
channel.config().setSoLinger(0);
channel.close(); // send RST
}
default -> ctx.fireExceptionCaught(cause);
}
}

private void logError(ChannelHandlerContext ctx, Throwable cause) {
String transLog = ExceptionHandler.transLog(ctx.channel());
logger.error("[tcp][{}] {}", transLog, cause.getMessage());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.urbanspork.common.codec.shadowsocks.tcp;

import io.netty.handler.codec.DecoderException;

class TooShortHeaderException extends DecoderException {
TooShortHeaderException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public RelayingPacket<ByteBuf> decode(Context context, ByteBuf in) throws Invali
}
}

// Client -> Server
// Client -> Server(*)
private RelayingPacket<ByteBuf> decodeClientPocketAead2022(Context context, ByteBuf in) throws InvalidCipherTextException {
int nonceLength = AEAD2022.UDP.getNonceLength(cipherKind);
int tagSize = cipherMethod.tagSize();
Expand Down Expand Up @@ -144,7 +144,7 @@ private RelayingPacket<ByteBuf> decodeClientPocketAead2022(Context context, Byte
return new RelayingPacket<>(address, packet);
}

// Server -> Client
// Server -> Client(*)
private RelayingPacket<ByteBuf> decodeServerPocketAead2022(Context context, ByteBuf in) throws InvalidCipherTextException {
int nonceLength = AEAD2022.UDP.getNonceLength(cipherKind);
int tagSize = cipherMethod.tagSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.util.concurrent.CompletableFuture;

public class EchoTestServer {
Expand Down Expand Up @@ -43,7 +44,7 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
});
}
})
.bind(port).addListener((ChannelFutureListener) future -> {
.bind(InetAddress.getLoopbackAddress(), port).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
ServerSocketChannel channel = (ServerSocketChannel) future.channel();
logger.info("Launch echo test server => {}", channel.localAddress());
Expand Down
123 changes: 123 additions & 0 deletions urban-spork-test/src/com/urbanspork/test/tool/TcpCapture.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.urbanspork.test.tool;

import com.urbanspork.common.channel.DefaultChannelInboundHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;

public class TcpCapture {
private static final Logger logger = LoggerFactory.getLogger(TcpCapture.class);
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final List<byte[]> outboundCapture = new ArrayList<>();
private final int remotePort;
private final boolean block;
private final ServerSocketChannel localChannel;
private final Promise<List<byte[]>> outboundPromise;

public TcpCapture(int remotePort, boolean block) {
this.remotePort = remotePort;
this.block = block;
this.localChannel = startup();
this.outboundPromise = bossGroup.next().newPromise();
}

public List<byte[]> nextOutboundCapture() throws InterruptedException, ExecutionException {
return outboundPromise.get();
}

public ServerSocketChannel getLocalChannel() {
return localChannel;
}

public void send(byte[] msg, BiConsumer<SocketChannel, byte[]> func) throws InterruptedException {
SocketChannel channel = (SocketChannel) new Bootstrap().group(bossGroup)
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
TcpCapture.logger.error("{}: {}", ctx.channel(), cause.getMessage());
ctx.close();
}
})
.connect(InetAddress.getLoopbackAddress(), remotePort)
.sync().channel();
func.accept(channel, msg);
channel.closeFuture().sync();
}

private ServerSocketChannel startup() {
return (ServerSocketChannel) new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInboundHandlerAdapter() {
private ChannelPromise promise;

@Override
public void handlerAdded(ChannelHandlerContext ctx) {
promise = ctx.newPromise();
Channel inbound = ctx.channel();
new Bootstrap().group(inbound.eventLoop())
.channel(inbound.getClass())
.handler(new DefaultChannelInboundHandler(inbound))
.connect(InetAddress.getLoopbackAddress(), remotePort).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
Channel outbound = future.channel();
inbound.pipeline().addLast(new DefaultChannelInboundHandler(outbound));
promise.setSuccess();
} else {
logger.error("Connect target localhost:{} failed", remotePort, future.cause());
promise.setFailure(future.cause());
ctx.close();
}
}
);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf buf) {
logger.info("captured outbound: {} bytes", buf.readableBytes());
outboundCapture.add(ByteBufUtil.getBytes(buf));
if (!outboundPromise.isDone()) {
outboundPromise.setSuccess(outboundCapture);
}
if (block) {
return;
}
if (promise.isDone()) {
ctx.fireChannelRead(buf);
} else {
promise.addListener(c -> ctx.fireChannelRead(buf));
}
} else {
ctx.fireChannelRead(msg);
}
}
})
.bind(0).syncUninterruptibly().channel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void testAead2022TcpAntiReplay() {
Session serverSession = new Session(Mode.Server, identity, request, ServerUserManager.EMPTY, context);
List<Object> out = new ArrayList<>();
AeadCipherCodec serverCodec1 = AeadCipherCodecs.get(config);
Assertions.assertThrows(IndexOutOfBoundsException.class, () -> serverCodec1.decode(serverSession, tooShortMsg, out));
Assertions.assertThrows(TooShortHeaderException.class, () -> serverCodec1.decode(serverSession, tooShortMsg, out));
Assertions.assertDoesNotThrow(() -> serverCodec1.decode(serverSession, msg1, out));
AeadCipherCodec serverCodec2 = AeadCipherCodecs.get(config);
Assertions.assertThrows(DecoderException.class, () -> serverCodec2.decode(serverSession, msg2, out));
Expand Down
97 changes: 97 additions & 0 deletions urban-spork-test/test/com/urbanspork/test/TcpPreventionTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.urbanspork.test;

import com.urbanspork.client.Client;
import com.urbanspork.common.codec.CipherKind;
import com.urbanspork.common.config.ClientConfig;
import com.urbanspork.common.config.ClientConfigTest;
import com.urbanspork.common.config.ServerConfig;
import com.urbanspork.common.config.ServerConfigTest;
import com.urbanspork.common.config.ServerUserConfig;
import com.urbanspork.common.protocol.Protocol;
import com.urbanspork.server.Server;
import com.urbanspork.test.template.TcpTestTemplate;
import com.urbanspork.test.tool.TcpCapture;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.util.NetUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

class TcpPreventionTest extends TcpTestTemplate {
private List<Server.Instance> server;
private TcpCapture capture;
private Client.Instance client;

@Test
void testTooShortHeader() throws InterruptedException, ExecutionException {
setUp(true);
Channel channel = connect(client.tcp().localAddress());
String request = "GET http://" + NetUtil.toSocketAddressString(dstAddress) + "/?a=b&c=d HTTP/1.1\r\n\r\n";
channel.writeAndFlush(Unpooled.wrappedBuffer(request.getBytes())).sync();
byte[] msg = capture.nextOutboundCapture().getLast();
capture.send(msg, (c, m) -> {
byte[] header = Arrays.copyOf(m, 42);
c.writeAndFlush(Unpooled.wrappedBuffer(header));
int i = 42;
for (; i < m.length; i++) {
if (!c.isActive()) {
c.writeAndFlush(Unpooled.wrappedBuffer(new byte[]{msg[i]}));
}
}
Assertions.assertEquals(msg.length, i);
});
closeServer(server);
client.close();
}

@Test
void testReplayAttack() throws ExecutionException, InterruptedException, TimeoutException {
setUp(false);
checkHttpSendBytes(client.tcp().localAddress());
byte[] msg = capture.nextOutboundCapture().getLast();
capture.send(msg, (c, m) -> {
byte[] header = Arrays.copyOf(m, 75);
c.writeAndFlush(Unpooled.wrappedBuffer(header));
int count = 75;
for (int i = 75; i < m.length; i++) {
if (c.isWritable()) {
count++;
c.writeAndFlush(Unpooled.wrappedBuffer(new byte[]{m[i]}));
} else {
break;
}
}
Assertions.assertEquals(msg.length, count);
});
closeServer(server);
client.close();
}

void setUp(boolean block) throws ExecutionException, InterruptedException {
ServerConfig serverConfig = ServerConfigTest.testConfig(0);
Protocol protocol = Protocol.shadowsocks;
CipherKind cipher = CipherKind.aead2022_blake3_aes_256_gcm;
serverConfig.setProtocol(protocol);
serverConfig.setCipher(cipher);
String clientPassword = TestDice.rollPassword(protocol, cipher);
String serverPassword = TestDice.rollPassword(protocol, cipher);
serverConfig.setPassword(serverPassword);
List<ServerUserConfig> user = new ArrayList<>();
user.add(new ServerUserConfig(TestDice.rollString(10), clientPassword));
serverConfig.setUser(user);
server = launchServer(List.of(serverConfig));
capture = new TcpCapture(serverConfig.getPort(), block);
ClientConfig config = ClientConfigTest.testConfig(0, capture.getLocalChannel().localAddress().getPort());
ServerConfig current = config.getCurrent();
current.setCipher(serverConfig.getCipher());
current.setProtocol(serverConfig.getProtocol());
current.setPassword(serverPassword + ":" + clientPassword);
client = launchClient(config);
}
}
16 changes: 8 additions & 8 deletions urban-spork-test/test/com/urbanspork/test/TcpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ void testByParameter(Parameter parameter) throws ExecutionException, Interrupted
Client.Instance client = launchClient(config);
InetSocketAddress clientAddress = client.tcp().localAddress();
socksHandshakeAndSendBytes(clientAddress);
httpsHandshakeAndSendBytes(clientAddress);
httpSendBytes(clientAddress);
checkHttpsHandshakeAndSendBytes(clientAddress);
checkHttpSendBytes(clientAddress);
closeServer(server);
client.close();
}
Expand All @@ -62,7 +62,7 @@ void testHttpBadRequest() throws ExecutionException, InterruptedException {
ClientConfig config = ClientConfigTest.testConfig(0, 0);
Client.Instance client = launchClient(config);
InetSocketAddress proxyAddress = client.udp().localAddress();
Assertions.assertThrows(ExecutionException.class, () -> httpSendBytes(proxyAddress, proxyAddress));
Assertions.assertThrows(ExecutionException.class, () -> checkHttpSendBytes(proxyAddress, proxyAddress));
}

@Test
Expand All @@ -71,8 +71,8 @@ void testConnectServerFailed() throws ExecutionException, InterruptedException {
Client.Instance client = launchClient(config);
InetSocketAddress clientAddress = client.tcp().localAddress();
Assertions.assertThrows(ExecutionException.class, () -> socksHandshakeAndSendBytes(clientAddress));
Assertions.assertThrows(ExecutionException.class, () -> httpsHandshakeAndSendBytes(clientAddress));
Assertions.assertThrows(ExecutionException.class, () -> httpSendBytes(clientAddress));
Assertions.assertThrows(ExecutionException.class, () -> checkHttpsHandshakeAndSendBytes(clientAddress));
Assertions.assertThrows(ExecutionException.class, () -> checkHttpSendBytes(clientAddress));
client.close();
}

Expand All @@ -93,10 +93,10 @@ void testShadowsocksAEAD2022EihByParameter(Parameter parameter) throws Execution
current.setProtocol(protocol);
current.setPassword(parameter.serverPassword() + ":" + parameter.clientPassword());
Client.Instance client = launchClient(config);
InetSocketAddress clientAddress = client.udp().localAddress();
InetSocketAddress clientAddress = client.tcp().localAddress();
socksHandshakeAndSendBytes(clientAddress);
httpsHandshakeAndSendBytes(clientAddress);
httpSendBytes(clientAddress);
checkHttpsHandshakeAndSendBytes(clientAddress);
checkHttpSendBytes(clientAddress);
ServerUserManager.DEFAULT.clear();
closeServer(server);
client.close();
Expand Down
Loading

0 comments on commit 5a148ca

Please sign in to comment.