Skip to content

Commit

Permalink
Merge branch 'sbruens/service' into sbruens/logger
Browse files Browse the repository at this point in the history
  • Loading branch information
sbruens committed Sep 16, 2024
2 parents 48796e1 + 9d126f9 commit b7bbfa0
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 73 deletions.
16 changes: 8 additions & 8 deletions cmd/outline-ss-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func init() {
)
}

type SSServer struct {
type OutlineServer struct {
stopConfig func() error
lnManager service.ListenerManager
natTimeout time.Duration
Expand All @@ -66,7 +66,7 @@ type SSServer struct {
replayCache service.ReplayCache
}

func (s *SSServer) loadConfig(filename string) error {
func (s *OutlineServer) loadConfig(filename string) error {
configData, err := os.ReadFile(filename)
if err != nil {
return fmt.Errorf("failed to read config file %s: %w", filename, err)
Expand Down Expand Up @@ -181,7 +181,7 @@ func (ls *listenerSet) Len() int {
return len(ls.listenerCloseFuncs)
}

func (s *SSServer) runConfig(config Config) (func() error, error) {
func (s *OutlineServer) runConfig(config Config) (func() error, error) {
startErrCh := make(chan error)
stopErrCh := make(chan error)
stopCh := make(chan struct{})
Expand Down Expand Up @@ -298,7 +298,7 @@ func (s *SSServer) runConfig(config Config) (func() error, error) {
}

// Stop stops serving the current config.
func (s *SSServer) Stop() error {
func (s *OutlineServer) Stop() error {
stopFunc := s.stopConfig
if stopFunc == nil {
return nil
Expand All @@ -311,9 +311,9 @@ func (s *SSServer) Stop() error {
return nil
}

// RunSSServer starts a shadowsocks server running, and returns the server or an error.
func RunSSServer(filename string, natTimeout time.Duration, serverMetrics *serverMetrics, serviceMetrics service.ServiceMetrics, replayHistory int) (*SSServer, error) {
server := &SSServer{
// RunOutlineServer starts an Outline server running, and returns the server or an error.
func RunOutlineServer(filename string, natTimeout time.Duration, serverMetrics *serverMetrics, serviceMetrics service.ServiceMetrics, replayHistory int) (*OutlineServer, error) {
server := &OutlineServer{
lnManager: service.NewListenerManager(),
natTimeout: natTimeout,
serverMetrics: serverMetrics,
Expand Down Expand Up @@ -405,7 +405,7 @@ func main() {
r := prometheus.WrapRegistererWithPrefix("shadowsocks_", prometheus.DefaultRegisterer)
r.MustRegister(serverMetrics, serviceMetrics)

_, err = RunSSServer(flags.ConfigFile, flags.natTimeout, serverMetrics, serviceMetrics, flags.replayHistory)
_, err = RunOutlineServer(flags.ConfigFile, flags.natTimeout, serverMetrics, serviceMetrics, flags.replayHistory)
if err != nil {
slog.Error("Server failed to start. Aborting.", "err", err)
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/outline-ss-server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (
"github.com/Jigsaw-Code/outline-ss-server/prometheus"
)

func TestRunSSServer(t *testing.T) {
func TestRunOutlineServer(t *testing.T) {
serverMetrics := newPrometheusServerMetrics()
serviceMetrics, err := prometheus.NewServiceMetrics(nil)
if err != nil {
t.Fatalf("Failed to create Prometheus service metrics: %v", err)
}
server, err := RunSSServer("config_example.yml", 30*time.Second, serverMetrics, serviceMetrics, 10000)
server, err := RunOutlineServer("config_example.yml", 30*time.Second, serverMetrics, serviceMetrics, 10000)
if err != nil {
t.Fatalf("RunSSServer() error = %v", err)
t.Fatalf("RunOutlineServer() error = %v", err)
}
if err := server.Stop(); err != nil {
t.Errorf("Error while stopping server: %v", err)
Expand Down
8 changes: 4 additions & 4 deletions internal/integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestTCPEcho(t *testing.T) {
replayCache := service.NewReplayCache(5)
const testTimeout = 200 * time.Millisecond
testMetrics := &statusMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, &fakeShadowsocksMetrics{})
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, &fakeShadowsocksMetrics{}, nil)
handler := service.NewStreamHandler(authFunc, testTimeout)
handler.SetTargetDialer(&transport.TCPDialer{})
done := make(chan struct{})
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestRestrictedAddresses(t *testing.T) {
require.NoError(t, err)
const testTimeout = 200 * time.Millisecond
testMetrics := &statusMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{})
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{}, nil)
handler := service.NewStreamHandler(authFunc, testTimeout)
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -411,7 +411,7 @@ func BenchmarkTCPThroughput(b *testing.B) {
}
const testTimeout = 200 * time.Millisecond
testMetrics := &service.NoOpTCPConnMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{})
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{}, nil)
handler := service.NewStreamHandler(authFunc, testTimeout)
handler.SetTargetDialer(&transport.TCPDialer{})
done := make(chan struct{})
Expand Down Expand Up @@ -478,7 +478,7 @@ func BenchmarkTCPMultiplexing(b *testing.B) {
replayCache := service.NewReplayCache(service.MaxCapacity)
const testTimeout = 200 * time.Millisecond
testMetrics := &service.NoOpTCPConnMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, &fakeShadowsocksMetrics{})
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, &fakeShadowsocksMetrics{}, nil)
handler := service.NewStreamHandler(authFunc, testTimeout)
handler.SetTargetDialer(&transport.TCPDialer{})
done := make(chan struct{})
Expand Down
59 changes: 20 additions & 39 deletions service/shadowsocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package service

import (
"context"
"fmt"
"net"
"time"

Expand Down Expand Up @@ -47,8 +46,8 @@ type Service interface {
HandlePacket(conn net.PacketConn)
}

// Option user's option.
type Option func(s *ssService) error
// Option is a Shadowsocks service constructor option.
type Option func(s *ssService)

type ssService struct {
logger Logger
Expand All @@ -65,88 +64,70 @@ func NewShadowsocksService(opts ...Option) (Service, error) {
s := &ssService{}

for _, opt := range opts {
if err := opt(s); err != nil {
return nil, fmt.Errorf("failed to create new service: %v", err)
}
opt(s)
}

if s.natTimeout == 0 {
s.natTimeout = defaultNatTimeout
}

// TODO: Register initial data metrics at zero.
s.sh = NewStreamHandler(
NewShadowsocksStreamAuthenticator(s.ciphers, s.replayCache, &ssConnMetrics{ServiceMetrics: s.m, proto: "tcp"}, s.logger),
tcpReadTimeout,
)
s.ph = NewPacketHandler(s.natTimeout, s.ciphers, s.m, &ssConnMetrics{ServiceMetrics: s.m, proto: "udp"})
if s.logger != nil {
s.sh.SetLogger(s.logger)
s.ph.SetLogger(s.logger)
}

return s, nil
}

// WithLogger can be used to provide a custom log target.
func WithLogger(l Logger) Option {
return func(s *ssService) error {
return func(s *ssService) {
s.logger = l
return nil
}
}

// WithCiphers option function.
func WithCiphers(ciphers CipherList) Option {
return func(s *ssService) error {
return func(s *ssService) {
s.ciphers = ciphers
return nil
}
}

// WithMetrics option function.
func WithMetrics(metrics ServiceMetrics) Option {
return func(s *ssService) error {
return func(s *ssService) {
s.m = metrics
return nil
}
}

// WithReplayCache option function.
func WithReplayCache(replayCache *ReplayCache) Option {
return func(s *ssService) error {
return func(s *ssService) {
s.replayCache = replayCache
return nil
}
}

// WithNatTimeout option function.
func WithNatTimeout(natTimeout time.Duration) Option {
return func(s *ssService) error {
return func(s *ssService) {
s.natTimeout = natTimeout
return nil
}
}

// HandleStream handles a Shadowsocks stream-based connection.
func (s *ssService) HandleStream(ctx context.Context, conn transport.StreamConn) {
if s.sh == nil {
authFunc := NewShadowsocksStreamAuthenticator(
s.ciphers,
s.replayCache,
&ssConnMetrics{ServiceMetrics: s.m, proto: "tcp"},
)
// TODO: Register initial data metrics at zero.
s.sh = NewStreamHandler(authFunc, tcpReadTimeout)
if s.logger != nil {
s.sh.SetLogger(s.logger)
}
}
connMetrics := s.m.AddOpenTCPConnection(conn)
s.sh.Handle(ctx, conn, connMetrics)
}

// HandlePacket handles a Shadowsocks packet connection.
func (s *ssService) HandlePacket(conn net.PacketConn) {
if s.ph == nil {
s.ph = NewPacketHandler(
s.natTimeout,
s.ciphers,
s.m,
&ssConnMetrics{ServiceMetrics: s.m, proto: "udp"},
)
if s.logger != nil {
s.ph.SetLogger(s.logger)
}
}
s.ph.Handle(conn)
}

Expand Down
18 changes: 9 additions & 9 deletions service/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func remoteIP(conn net.Conn) netip.Addr {
func debugTCP(l Logger, template string, cipherID string, attr slog.Attr) {
// This is an optimization to reduce unnecessary allocations due to an interaction
// between Go's inlining/escape analysis and varargs functions like slog.Debug.
if l.Enabled(nil, slog.LevelDebug) {
if l != nil && l.Enabled(nil, slog.LevelDebug) {
l.LogAttrs(nil, slog.LevelDebug, fmt.Sprintf("TCP: %s", template), slog.String("ID", cipherID), attr)
}
}
Expand All @@ -72,7 +72,7 @@ func debugTCP(l Logger, template string, cipherID string, attr slog.Attr) {
// required = saltSize + 2 + cipher.TagSize, the number of bytes needed to authenticate the connection.
const bytesForKeyFinding = 50

func findAccessKey(l Logger, clientReader io.Reader, clientIP netip.Addr, cipherList CipherList) (*CipherEntry, io.Reader, []byte, time.Duration, error) {
func findAccessKey(clientReader io.Reader, clientIP netip.Addr, cipherList CipherList, l Logger) (*CipherEntry, io.Reader, []byte, time.Duration, error) {
// We snapshot the list because it may be modified while we use it.
ciphers := cipherList.SnapshotForClientIP(clientIP)
firstBytes := make([]byte, bytesForKeyFinding)
Expand All @@ -81,7 +81,7 @@ func findAccessKey(l Logger, clientReader io.Reader, clientIP netip.Addr, cipher
}

findStartTime := time.Now()
entry, elt := findEntry(l, firstBytes, ciphers)
entry, elt := findEntry(firstBytes, ciphers, l)
timeToCipher := time.Since(findStartTime)
if entry == nil {
// TODO: Ban and log client IPs with too many failures too quick to protect against DoS.
Expand All @@ -95,7 +95,7 @@ func findAccessKey(l Logger, clientReader io.Reader, clientIP netip.Addr, cipher
}

// Implements a trial decryption search. This assumes that all ciphers are AEAD.
func findEntry(l Logger, firstBytes []byte, ciphers []*list.Element) (*CipherEntry, *list.Element) {
func findEntry(firstBytes []byte, ciphers []*list.Element, l Logger) (*CipherEntry, *list.Element) {
// To hold the decrypted chunk length.
chunkLenBuf := [2]byte{}
for ci, elt := range ciphers {
Expand All @@ -112,14 +112,14 @@ func findEntry(l Logger, firstBytes []byte, ciphers []*list.Element) (*CipherEnt
return nil, nil
}

type StreamAuthenticateFunc func(l Logger, clientConn transport.StreamConn) (string, transport.StreamConn, *onet.ConnectionError)
type StreamAuthenticateFunc func(clientConn transport.StreamConn) (string, transport.StreamConn, *onet.ConnectionError)

// NewShadowsocksStreamAuthenticator creates a stream authenticator that uses Shadowsocks.
// TODO(fortuna): Offer alternative transports.
func NewShadowsocksStreamAuthenticator(ciphers CipherList, replayCache *ReplayCache, metrics ShadowsocksConnMetrics) StreamAuthenticateFunc {
return func(l Logger, clientConn transport.StreamConn) (string, transport.StreamConn, *onet.ConnectionError) {
func NewShadowsocksStreamAuthenticator(ciphers CipherList, replayCache *ReplayCache, metrics ShadowsocksConnMetrics, l Logger) StreamAuthenticateFunc {
return func(clientConn transport.StreamConn) (string, transport.StreamConn, *onet.ConnectionError) {
// Find the cipher and acess key id.
cipherEntry, clientReader, clientSalt, timeToCipher, keyErr := findAccessKey(l, clientConn, remoteIP(clientConn), ciphers)
cipherEntry, clientReader, clientSalt, timeToCipher, keyErr := findAccessKey(clientConn, remoteIP(clientConn), ciphers, l)
metrics.AddCipherSearch(keyErr == nil, timeToCipher)
if keyErr != nil {
const status = "ERR_CIPHER"
Expand Down Expand Up @@ -327,7 +327,7 @@ func (h *streamHandler) handleConnection(ctx context.Context, outerConn transpor
}
outerConn.SetReadDeadline(readDeadline)

id, innerConn, authErr := h.authenticate(h.l, outerConn)
id, innerConn, authErr := h.authenticate(outerConn)
if authErr != nil {
// Drain to protect against probing attacks.
h.absorbProbe(outerConn, connMetrics, authErr.Status, proxyMetrics)
Expand Down
20 changes: 10 additions & 10 deletions service/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func BenchmarkTCPFindCipherFail(b *testing.B) {
}
clientIP := clientConn.RemoteAddr().(*net.TCPAddr).AddrPort().Addr()
b.StartTimer()
findAccessKey(&noopLogger{}, clientConn, clientIP, cipherList)
findAccessKey(clientConn, clientIP, cipherList, nil)
b.StopTimer()
}
}
Expand Down Expand Up @@ -205,7 +205,7 @@ func BenchmarkTCPFindCipherRepeat(b *testing.B) {
cipher := cipherEntries[cipherNumber].CryptoKey
go shadowsocks.NewWriter(writer, cipher).Write(makeTestPayload(50))
b.StartTimer()
_, _, _, _, err := findAccessKey(&noopLogger{}, &c, clientIP, cipherList)
_, _, _, _, err := findAccessKey(&c, clientIP, cipherList, nil)
b.StopTimer()
if err != nil {
b.Error(err)
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestProbeRandom(t *testing.T) {
cipherList, err := MakeTestCiphers(makeTestSecrets(1))
require.NoError(t, err, "MakeTestCiphers failed: %v", err)
testMetrics := &probeTestMetrics{}
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{})
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{}, nil)
handler := NewStreamHandler(authFunc, 200*time.Millisecond)
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -365,7 +365,7 @@ func TestProbeClientBytesBasicTruncated(t *testing.T) {
require.NoError(t, err, "MakeTestCiphers failed: %v", err)
cipher := firstCipher(cipherList)
testMetrics := &probeTestMetrics{}
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{})
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{}, nil)
handler := NewStreamHandler(authFunc, 200*time.Millisecond)
handler.SetTargetDialer(makeValidatingTCPStreamDialer(allowAll))
done := make(chan struct{})
Expand Down Expand Up @@ -403,7 +403,7 @@ func TestProbeClientBytesBasicModified(t *testing.T) {
require.NoError(t, err, "MakeTestCiphers failed: %v", err)
cipher := firstCipher(cipherList)
testMetrics := &probeTestMetrics{}
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{})
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{}, nil)
handler := NewStreamHandler(authFunc, 200*time.Millisecond)
handler.SetTargetDialer(makeValidatingTCPStreamDialer(allowAll))
done := make(chan struct{})
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestProbeClientBytesCoalescedModified(t *testing.T) {
require.NoError(t, err, "MakeTestCiphers failed: %v", err)
cipher := firstCipher(cipherList)
testMetrics := &probeTestMetrics{}
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{})
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{}, nil)
handler := NewStreamHandler(authFunc, 200*time.Millisecond)
handler.SetTargetDialer(makeValidatingTCPStreamDialer(allowAll))
done := make(chan struct{})
Expand Down Expand Up @@ -488,7 +488,7 @@ func TestProbeServerBytesModified(t *testing.T) {
require.NoError(t, err, "MakeTestCiphers failed: %v", err)
cipher := firstCipher(cipherList)
testMetrics := &probeTestMetrics{}
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{})
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{}, nil)
handler := NewStreamHandler(authFunc, 200*time.Millisecond)
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -522,7 +522,7 @@ func TestReplayDefense(t *testing.T) {
replayCache := NewReplayCache(5)
testMetrics := &probeTestMetrics{}
const testTimeout = 200 * time.Millisecond
authFunc := NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics)
authFunc := NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics, nil)
handler := NewStreamHandler(authFunc, testTimeout)
snapshot := cipherList.SnapshotForClientIP(netip.Addr{})
cipherEntry := snapshot[0].Value.(*CipherEntry)
Expand Down Expand Up @@ -604,7 +604,7 @@ func TestReverseReplayDefense(t *testing.T) {
replayCache := NewReplayCache(5)
testMetrics := &probeTestMetrics{}
const testTimeout = 200 * time.Millisecond
authFunc := NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics)
authFunc := NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics, nil)
handler := NewStreamHandler(authFunc, testTimeout)
snapshot := cipherList.SnapshotForClientIP(netip.Addr{})
cipherEntry := snapshot[0].Value.(*CipherEntry)
Expand Down Expand Up @@ -678,7 +678,7 @@ func probeExpectTimeout(t *testing.T, payloadSize int) {
cipherList, err := MakeTestCiphers(makeTestSecrets(5))
require.NoError(t, err, "MakeTestCiphers failed: %v", err)
testMetrics := &probeTestMetrics{}
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{})
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{}, nil)
handler := NewStreamHandler(authFunc, testTimeout)

done := make(chan struct{})
Expand Down

0 comments on commit b7bbfa0

Please sign in to comment.