Skip to content

Commit

Permalink
feat: allow group override global node connectivity check (#623)
Browse files Browse the repository at this point in the history
Co-authored-by: mzz2017 <[email protected]>
  • Loading branch information
KagurazakaNyaa and mzz2017 committed Sep 8, 2024
1 parent c885661 commit 9f04adf
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 14 deletions.
13 changes: 13 additions & 0 deletions component/outbound/dialer/connectivity_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"
"sync"
"time"
"unsafe"

"github.com/daeuniverse/dae/common"

Expand Down Expand Up @@ -452,6 +453,18 @@ func (d *Dialer) aliveBackground() {
}
}
}()
var unused int
for _, opt := range CheckOpts {
if len(d.mustGetCollection(opt.networkType).AliveDialerSetSet) == 0 {
unused++
}
}
if unused == len(CheckOpts) {
d.Log.WithField("dialer", d.Property().Name).
WithField("p", unsafe.Pointer(d)).
Traceln("cleaned up due to unused")
return
}
var wg sync.WaitGroup
for range d.checkCh {
for _, opt := range CheckOpts {
Expand Down
25 changes: 25 additions & 0 deletions component/outbound/dialer/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
"fmt"
"sync"
"time"
"unsafe"

"github.com/daeuniverse/dae/common"
"github.com/daeuniverse/dae/config"
D "github.com/daeuniverse/outbound/dialer"
"github.com/daeuniverse/outbound/netproxy"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -60,6 +63,21 @@ type Property struct {

type AliveDialerSetSet map[*AliveDialerSet]int

func NewGlobalOption(global *config.Global, log *logrus.Logger) *GlobalOption {
return &GlobalOption{
ExtraOption: D.ExtraOption{
AllowInsecure: global.AllowInsecure,
TlsImplementation: global.TlsImplementation,
UtlsImitate: global.UtlsImitate},
Log: log,
TcpCheckOptionRaw: TcpCheckOptionRaw{Raw: global.TcpCheckUrl, Log: log, ResolverNetwork: common.MagicNetwork("udp", global.SoMarkFromDae, global.Mptcp), Method: global.TcpCheckHttpMethod},
CheckDnsOptionRaw: CheckDnsOptionRaw{Raw: global.UdpCheckDns, ResolverNetwork: common.MagicNetwork("udp", global.SoMarkFromDae, global.Mptcp), Somark: global.SoMarkFromDae},
CheckInterval: global.CheckInterval,
CheckTolerance: global.CheckTolerance,
CheckDnsTcp: true,
}
}

// NewDialer is for register in general.
func NewDialer(dialer netproxy.Dialer, option *GlobalOption, iOption InstanceOption, property *Property) *Dialer {
var collections [6]*collection
Expand All @@ -80,9 +98,16 @@ func NewDialer(dialer netproxy.Dialer, option *GlobalOption, iOption InstanceOpt
ctx: ctx,
cancel: cancel,
}
option.Log.WithField("dialer", d.Property().Name).
WithField("p", unsafe.Pointer(d)).
Traceln("NewDialer")
return d
}

func (d *Dialer) Clone() *Dialer {
return NewDialer(d.Dialer, d.GlobalOption, d.InstanceOption, d.property)
}

func (d *Dialer) Close() error {
d.cancel()
d.tickerMu.Lock()
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ type Group struct {
Filter [][]*config_parser.Function `mapstructure:"filter" repeatable:""`
FilterAnnotation [][]*config_parser.Param `mapstructure:"_"`
Policy FunctionListOrString `mapstructure:"policy" required:""`

TcpCheckUrl []string `mapstructure:"tcp_check_url"`
TcpCheckHttpMethod string `mapstructure:"tcp_check_http_method"`
UdpCheckDns []string `mapstructure:"udp_check_dns"`
CheckInterval time.Duration `mapstructure:"check_interval"`
CheckTolerance time.Duration `mapstructure:"check_tolerance"`
}

type DnsRequestRouting struct {
Expand Down
5 changes: 5 additions & 0 deletions config/desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,9 @@ min: Select node by the latency of last check.
min_avg10: Select node by the average of latencies of last 10 checks.
min_moving_avg: Select node by the moving average of latencies of checks, which means more recent latencies have higher weight.
`,
"tcp_check_url": "Override global config.",
"tcp_check_http_method": "Override global config.",
"udp_check_dns": "Override global config.",
"check_interval": "Override global config.",
"check_tolerance": "Override global config.",
}
60 changes: 46 additions & 14 deletions control/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/daeuniverse/dae/config"
"github.com/daeuniverse/dae/pkg/config_parser"
internal "github.com/daeuniverse/dae/pkg/ebpf_internal"
D "github.com/daeuniverse/outbound/dialer"
"github.com/daeuniverse/outbound/pool"
"github.com/daeuniverse/outbound/protocol/direct"
"github.com/daeuniverse/outbound/transport/grpc"
Expand Down Expand Up @@ -256,18 +255,7 @@ func NewControlPlane(
if global.AllowInsecure {
log.Warnln("AllowInsecure is enabled, but it is not recommended. Please make sure you have to turn it on.")
}
option := &dialer.GlobalOption{
ExtraOption: D.ExtraOption{
AllowInsecure: global.AllowInsecure,
TlsImplementation: global.TlsImplementation,
UtlsImitate: global.UtlsImitate},
Log: log,
TcpCheckOptionRaw: dialer.TcpCheckOptionRaw{Raw: global.TcpCheckUrl, Log: log, ResolverNetwork: common.MagicNetwork("udp", global.SoMarkFromDae, global.Mptcp), Method: global.TcpCheckHttpMethod},
CheckDnsOptionRaw: dialer.CheckDnsOptionRaw{Raw: global.UdpCheckDns, ResolverNetwork: common.MagicNetwork("udp", global.SoMarkFromDae, global.Mptcp), Somark: global.SoMarkFromDae},
CheckInterval: global.CheckInterval,
CheckTolerance: global.CheckTolerance,
CheckDnsTcp: true,
}
option := dialer.NewGlobalOption(global, log)

// Dial mode.
dialMode, err := consts.ParseDialMode(global.DialMode)
Expand Down Expand Up @@ -323,8 +311,22 @@ func NewControlPlane(
if len(dialers) == 0 {
log.Infoln("\t<Empty>")
}
groupOption, err := ParseGroupOverrideOption(group, *global, log)
finalOption := option
if err == nil && groupOption != nil {
newDialers := make([]*dialer.Dialer, 0)
for _, d := range dialers {
newDialer := d.Clone()
deferFuncs = append(deferFuncs, newDialer.Close)
newDialer.GlobalOption = groupOption
newDialers = append(newDialers, newDialer)
}
log.Infof(`Group "%v"'s check option has been override.`, group.Name)
dialers = newDialers
finalOption = groupOption
}
// Create dialer group and append it to outbounds.
dialerGroup := outbound.NewDialerGroup(option, group.Name, dialers, annos, *policy,
dialerGroup := outbound.NewDialerGroup(finalOption, group.Name, dialers, annos, *policy,
core.outboundAliveChangeCallback(uint8(len(outbounds)), disableKernelAliveCallback))
outbounds = append(outbounds, dialerGroup)
}
Expand Down Expand Up @@ -515,6 +517,36 @@ func ParseFixedDomainTtl(ks []config.KeyableString) (map[string]int, error) {
return m, nil
}

func ParseGroupOverrideOption(group config.Group, global config.Global, log *logrus.Logger) (*dialer.GlobalOption, error) {
result := global
changed := false
if group.TcpCheckUrl != nil {
result.TcpCheckUrl = group.TcpCheckUrl
changed = true
}
if group.TcpCheckHttpMethod != "" {
result.TcpCheckHttpMethod = group.TcpCheckHttpMethod
changed = true
}
if group.UdpCheckDns != nil {
result.UdpCheckDns = group.UdpCheckDns
changed = true
}
if group.CheckInterval != 0 {
result.CheckInterval = group.CheckInterval
changed = true
}
if group.CheckTolerance != 0 {
result.CheckTolerance = group.CheckTolerance
changed = true
}
if changed {
option := dialer.NewGlobalOption(&result, log)
return option, nil
}
return nil, nil
}

// EjectBpf will resect bpf from destroying life-cycle of control plane.
func (c *ControlPlane) EjectBpf() *bpfObjects {
return c.core.EjectBpf()
Expand Down
20 changes: 20 additions & 0 deletions example.dae
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ global {
auto_config_kernel_parameter: true

##### Node connectivity check.
# These options, as defaults, are effective when no definition is given in the group.

# Host of URL should have both IPv4 and IPv6 if you have double stack in local.
# First is URL, others are IP addresses if given.
Expand Down Expand Up @@ -213,6 +214,24 @@ group {
# Select the node with min average of the last 10 latencies from the group for every connection.
policy: min_avg10
}

steam {
# Filter nodes from the global node pool defined by the subscription and node section above.
filter: subtag(my_sub) && !name(keyword: 'ExpireAt:')
# Select the node with min moving average of latencies from the group for every connection.
policy: min_moving_avg

# Override tcp_check_url in global.
tcp_check_url: 'http://test.steampowered.com'
# Override tcp_check_http_method in global
#tcp_check_http_method: HEAD
# Override udp_check_dns in global
#udp_check_dns: 'dns.google.com:53,8.8.8.8,2001:4860:4860::8888'
# Override check_interval in global
#check_interval: 30s
# Override check_tolerance in global
#check_tolerance: 50ms
}
}

# See https://github.com/daeuniverse/dae/blob/main/docs/en/configuration/routing.md for full examples.
Expand All @@ -238,6 +257,7 @@ routing {
l4proto(udp) && dport(443) -> block
dip(geoip:cn) -> direct
domain(geosite:cn) -> direct


fallback: my_group
}

0 comments on commit 9f04adf

Please sign in to comment.