Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow group override global node connectivity check #623

Merged
merged 7 commits into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions component/outbound/dialer/connectivity_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"
"sync"
"time"
"unsafe"

"github.com/daeuniverse/dae/common"

Expand Down Expand Up @@ -452,6 +453,18 @@ func (d *Dialer) aliveBackground() {
}
}
}()
var unused int
for _, opt := range CheckOpts {
if len(d.mustGetCollection(opt.networkType).AliveDialerSetSet) == 0 {
unused++
}
}
if unused == len(CheckOpts) {
d.Log.WithField("dialer", d.Property().Name).
WithField("p", unsafe.Pointer(d)).
Traceln("cleaned up due to unused")
return
}
var wg sync.WaitGroup
for range d.checkCh {
for _, opt := range CheckOpts {
Expand Down
25 changes: 25 additions & 0 deletions component/outbound/dialer/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
"fmt"
"sync"
"time"
"unsafe"

"github.com/daeuniverse/dae/common"
"github.com/daeuniverse/dae/config"
D "github.com/daeuniverse/outbound/dialer"
"github.com/daeuniverse/outbound/netproxy"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -60,6 +63,21 @@ type Property struct {

type AliveDialerSetSet map[*AliveDialerSet]int

func NewGlobalOption(global *config.Global, log *logrus.Logger) *GlobalOption {
return &GlobalOption{
ExtraOption: D.ExtraOption{
AllowInsecure: global.AllowInsecure,
TlsImplementation: global.TlsImplementation,
UtlsImitate: global.UtlsImitate},
Log: log,
TcpCheckOptionRaw: TcpCheckOptionRaw{Raw: global.TcpCheckUrl, Log: log, ResolverNetwork: common.MagicNetwork("udp", global.SoMarkFromDae, global.Mptcp), Method: global.TcpCheckHttpMethod},
CheckDnsOptionRaw: CheckDnsOptionRaw{Raw: global.UdpCheckDns, ResolverNetwork: common.MagicNetwork("udp", global.SoMarkFromDae, global.Mptcp), Somark: global.SoMarkFromDae},
CheckInterval: global.CheckInterval,
CheckTolerance: global.CheckTolerance,
CheckDnsTcp: true,
}
}

// NewDialer is for register in general.
func NewDialer(dialer netproxy.Dialer, option *GlobalOption, iOption InstanceOption, property *Property) *Dialer {
var collections [6]*collection
Expand All @@ -80,9 +98,16 @@ func NewDialer(dialer netproxy.Dialer, option *GlobalOption, iOption InstanceOpt
ctx: ctx,
cancel: cancel,
}
option.Log.WithField("dialer", d.Property().Name).
WithField("p", unsafe.Pointer(d)).
Traceln("NewDialer")
return d
}

func (d *Dialer) Clone() *Dialer {
return NewDialer(d.Dialer, d.GlobalOption, d.InstanceOption, d.property)
}

func (d *Dialer) Close() error {
d.cancel()
d.tickerMu.Lock()
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ type Group struct {
Filter [][]*config_parser.Function `mapstructure:"filter" repeatable:""`
FilterAnnotation [][]*config_parser.Param `mapstructure:"_"`
Policy FunctionListOrString `mapstructure:"policy" required:""`

TcpCheckUrl []string `mapstructure:"tcp_check_url"`
TcpCheckHttpMethod string `mapstructure:"tcp_check_http_method"`
UdpCheckDns []string `mapstructure:"udp_check_dns"`
CheckInterval time.Duration `mapstructure:"check_interval"`
CheckTolerance time.Duration `mapstructure:"check_tolerance"`
}

type DnsRequestRouting struct {
Expand Down
5 changes: 5 additions & 0 deletions config/desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,9 @@ min: Select node by the latency of last check.
min_avg10: Select node by the average of latencies of last 10 checks.
min_moving_avg: Select node by the moving average of latencies of checks, which means more recent latencies have higher weight.
`,
"tcp_check_url": "Override global config.",
"tcp_check_http_method": "Override global config.",
"udp_check_dns": "Override global config.",
"check_interval": "Override global config.",
"check_tolerance": "Override global config.",
}
60 changes: 46 additions & 14 deletions control/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/daeuniverse/dae/config"
"github.com/daeuniverse/dae/pkg/config_parser"
internal "github.com/daeuniverse/dae/pkg/ebpf_internal"
D "github.com/daeuniverse/outbound/dialer"
"github.com/daeuniverse/outbound/pool"
"github.com/daeuniverse/outbound/protocol/direct"
"github.com/daeuniverse/outbound/transport/grpc"
Expand Down Expand Up @@ -256,18 +255,7 @@ func NewControlPlane(
if global.AllowInsecure {
log.Warnln("AllowInsecure is enabled, but it is not recommended. Please make sure you have to turn it on.")
}
option := &dialer.GlobalOption{
ExtraOption: D.ExtraOption{
AllowInsecure: global.AllowInsecure,
TlsImplementation: global.TlsImplementation,
UtlsImitate: global.UtlsImitate},
Log: log,
TcpCheckOptionRaw: dialer.TcpCheckOptionRaw{Raw: global.TcpCheckUrl, Log: log, ResolverNetwork: common.MagicNetwork("udp", global.SoMarkFromDae, global.Mptcp), Method: global.TcpCheckHttpMethod},
CheckDnsOptionRaw: dialer.CheckDnsOptionRaw{Raw: global.UdpCheckDns, ResolverNetwork: common.MagicNetwork("udp", global.SoMarkFromDae, global.Mptcp), Somark: global.SoMarkFromDae},
CheckInterval: global.CheckInterval,
CheckTolerance: global.CheckTolerance,
CheckDnsTcp: true,
}
option := dialer.NewGlobalOption(global, log)

// Dial mode.
dialMode, err := consts.ParseDialMode(global.DialMode)
Expand Down Expand Up @@ -323,8 +311,22 @@ func NewControlPlane(
if len(dialers) == 0 {
log.Infoln("\t<Empty>")
}
groupOption, err := ParseGroupOverrideOption(group, *global, log)
finalOption := option
if err == nil && groupOption != nil {
newDialers := make([]*dialer.Dialer, 0)
for _, d := range dialers {
newDialer := d.Clone()
KagurazakaNyaa marked this conversation as resolved.
Show resolved Hide resolved
deferFuncs = append(deferFuncs, newDialer.Close)
newDialer.GlobalOption = groupOption
newDialers = append(newDialers, newDialer)
}
log.Infof(`Group "%v"'s check option has been override.`, group.Name)
dialers = newDialers
finalOption = groupOption
}
// Create dialer group and append it to outbounds.
dialerGroup := outbound.NewDialerGroup(option, group.Name, dialers, annos, *policy,
dialerGroup := outbound.NewDialerGroup(finalOption, group.Name, dialers, annos, *policy,
core.outboundAliveChangeCallback(uint8(len(outbounds)), disableKernelAliveCallback))
outbounds = append(outbounds, dialerGroup)
}
Expand Down Expand Up @@ -515,6 +517,36 @@ func ParseFixedDomainTtl(ks []config.KeyableString) (map[string]int, error) {
return m, nil
}

func ParseGroupOverrideOption(group config.Group, global config.Global, log *logrus.Logger) (*dialer.GlobalOption, error) {
result := global
changed := false
if group.TcpCheckUrl != nil {
result.TcpCheckUrl = group.TcpCheckUrl
changed = true
}
if group.TcpCheckHttpMethod != "" {
result.TcpCheckHttpMethod = group.TcpCheckHttpMethod
changed = true
}
if group.UdpCheckDns != nil {
result.UdpCheckDns = group.UdpCheckDns
changed = true
}
if group.CheckInterval != 0 {
result.CheckInterval = group.CheckInterval
changed = true
}
if group.CheckTolerance != 0 {
result.CheckTolerance = group.CheckTolerance
changed = true
}
if changed {
option := dialer.NewGlobalOption(&result, log)
return option, nil
}
return nil, nil
}

// EjectBpf will resect bpf from destroying life-cycle of control plane.
func (c *ControlPlane) EjectBpf() *bpfObjects {
return c.core.EjectBpf()
Expand Down
20 changes: 20 additions & 0 deletions example.dae
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ global {
auto_config_kernel_parameter: true

##### Node connectivity check.
# These options, as defaults, are effective when no definition is given in the group.

# Host of URL should have both IPv4 and IPv6 if you have double stack in local.
# First is URL, others are IP addresses if given.
Expand Down Expand Up @@ -213,6 +214,24 @@ group {
# Select the node with min average of the last 10 latencies from the group for every connection.
policy: min_avg10
}

steam {
# Filter nodes from the global node pool defined by the subscription and node section above.
filter: subtag(my_sub) && !name(keyword: 'ExpireAt:')
# Select the node with min moving average of latencies from the group for every connection.
policy: min_moving_avg

# Override tcp_check_url in global.
tcp_check_url: 'http://test.steampowered.com'
# Override tcp_check_http_method in global
#tcp_check_http_method: HEAD
# Override udp_check_dns in global
#udp_check_dns: 'dns.google.com:53,8.8.8.8,2001:4860:4860::8888'
# Override check_interval in global
#check_interval: 30s
# Override check_tolerance in global
#check_tolerance: 50ms
}
}

# See https://github.com/daeuniverse/dae/blob/main/docs/en/configuration/routing.md for full examples.
Expand All @@ -238,6 +257,7 @@ routing {
l4proto(udp) && dport(443) -> block
dip(geoip:cn) -> direct
domain(geosite:cn) -> direct


fallback: my_group
}
Loading