Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
mnixry authored Sep 25, 2024
2 parents 2e2d0d6 + da8890c commit 30e4a42
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 49 deletions.
5 changes: 1 addition & 4 deletions component/sniffing/sniffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,10 @@ func (s *Sniffer) SniffTcp() (d string, err error) {
if s.stream {
go func() {
// Read once.
n, err := s.buf.ReadFromOnce(s.r)
_, err = s.buf.ReadFromOnce(s.r)
if err != nil {
s.dataError = err
}
if n == 0 {
s.dataError = io.EOF
}
close(s.dataReady)
}()

Expand Down
3 changes: 3 additions & 0 deletions control/dns_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func (c *DnsController) LookupDnsRespCache_(msg *dnsmessage.Msg, cacheKey string
cache := c.LookupDnsRespCache(cacheKey, ignoreFixedTtl)
if cache != nil {
cache.FillInto(msg)
msg.Compress = true
b, err := msg.Pack()
if err != nil {
c.log.Warnf("failed to pack: %v", err)
Expand Down Expand Up @@ -497,6 +498,7 @@ func (c *DnsController) sendReject_(dnsMessage *dnsmessage.Msg, req *udpRequest)
dnsMessage.Response = true
dnsMessage.RecursionAvailable = true
dnsMessage.Truncated = false
dnsMessage.Compress = true
if c.log.IsLevelEnabled(logrus.TraceLevel) {
c.log.WithFields(logrus.Fields{
"question": dnsMessage.Question,
Expand Down Expand Up @@ -756,6 +758,7 @@ func (c *DnsController) dialSend(invokingDepth int, req *udpRequest, data []byte
if needResp {
// Keep the id the same with request.
respMsg.Id = id
respMsg.Compress = true
data, err = respMsg.Pack()
if err != nil {
return err
Expand Down
24 changes: 13 additions & 11 deletions control/kern/tproxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1639,17 +1639,23 @@ int tproxy_dae0_ingress(struct __sk_buff *skb)
struct redirect_tuple redirect_tuple = {};

if (skb->protocol == bpf_htons(ETH_P_IP)) {
bpf_skb_load_bytes(skb, ETH_HLEN + offsetof(struct iphdr, daddr),
bpf_skb_load_bytes(skb,
ETH_HLEN + offsetof(struct iphdr, daddr),
&redirect_tuple.sip.u6_addr32[3],
sizeof(redirect_tuple.sip.u6_addr32[3]));
bpf_skb_load_bytes(skb, ETH_HLEN + offsetof(struct iphdr, saddr),
bpf_skb_load_bytes(skb,
ETH_HLEN + offsetof(struct iphdr, saddr),
&redirect_tuple.dip.u6_addr32[3],
sizeof(redirect_tuple.dip.u6_addr32[3]));
} else {
bpf_skb_load_bytes(skb, ETH_HLEN + offsetof(struct ipv6hdr, daddr),
&redirect_tuple.sip, sizeof(redirect_tuple.sip));
bpf_skb_load_bytes(skb, ETH_HLEN + offsetof(struct ipv6hdr, saddr),
&redirect_tuple.dip, sizeof(redirect_tuple.dip));
bpf_skb_load_bytes(skb,
ETH_HLEN + offsetof(struct ipv6hdr, daddr),
&redirect_tuple.sip,
sizeof(redirect_tuple.sip));
bpf_skb_load_bytes(skb,
ETH_HLEN + offsetof(struct ipv6hdr, saddr),
&redirect_tuple.dip,
sizeof(redirect_tuple.dip));
}
struct redirect_entry *redirect_entry =
bpf_map_lookup_elem(&redirect_track, &redirect_tuple);
Expand Down Expand Up @@ -1872,11 +1878,7 @@ int local_tcp_sockops(struct bpf_sock_ops *skops)
{
struct tuples_key rev_tuple = {};

rev_tuple.l4proto = IPPROTO_TCP;
rev_tuple.sport = tuple.dport;
rev_tuple.dport = tuple.sport;
__builtin_memcpy(&rev_tuple.sip, &tuple.dip, IPV6_BYTE_LENGTH);
__builtin_memcpy(&rev_tuple.dip, &tuple.sip, IPV6_BYTE_LENGTH);
copy_reversed_tuples(&tuple, &rev_tuple);

struct routing_result *routing_result;

Expand Down
11 changes: 7 additions & 4 deletions control/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ import (
"github.com/sirupsen/logrus"
)

const (
var (
DefaultNatTimeout = 3 * time.Minute
DnsNatTimeout = 17 * time.Second // RFC 5452
AnyfromTimeout = 5 * time.Second // Do not cache too long.
MaxRetry = 2
)

const (
DnsNatTimeout = 17 * time.Second // RFC 5452
AnyfromTimeout = 5 * time.Second // Do not cache too long.
MaxRetry = 2
)

type DialOption struct {
Expand Down
84 changes: 54 additions & 30 deletions control/udp_task_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,26 @@ package control

import (
"sync"
"sync/atomic"
"time"

"github.com/panjf2000/ants"
)

var isTest = false

const UdpTaskQueueLength = 128

type UdpTask = func()

// UdpTaskQueue make sure packets with the same key (4 tuples) will be sent in order.
type UdpTaskQueue struct {
key string
p *UdpTaskPool
ch chan UdpTask
timer *time.Timer
agingTime time.Duration
closed chan struct{}
closed atomic.Bool
freed chan struct{}
}

Expand All @@ -23,6 +30,30 @@ func (q *UdpTaskQueue) Push(task UdpTask) {
q.ch <- task
}

func (q *UdpTaskQueue) convoy() {
for {
if q.closed.Load() {
clearloop:
for {
select {
case t := <-q.ch:
// Emit it back due to closed q.
ReemitWorkers.Submit(func() {
q.p.EmitTask(q.key, t)
})
default:
break clearloop
}
}
close(q.freed)
return
} else {
t := <-q.ch
t()
}
}
}

type UdpTaskPool struct {
queueChPool sync.Pool
// mu protects m
Expand All @@ -41,62 +72,55 @@ func NewUdpTaskPool() *UdpTaskPool {
return p
}

func (p *UdpTaskPool) convoy(q *UdpTaskQueue) {
for {
select {
case <-q.closed:
clearloop:
for {
select {
case t := <-q.ch:
// Emit it back due to closed q.
p.EmitTask(q.key, t)
default:
break clearloop
}
}
close(q.freed)
return
case t := <-q.ch:
t()
}
}
}

// EmitTask: Make sure packets with the same key (4 tuples) will be sent in order.
func (p *UdpTaskPool) EmitTask(key string, task UdpTask) {
p.mu.Lock()
q, ok := p.m[key]
if !ok {
ch := p.queueChPool.Get().(chan UdpTask)
q = &UdpTaskQueue{
key: key,
p: p,
ch: ch,
timer: nil,
agingTime: DefaultNatTimeout,
closed: make(chan struct{}),
closed: atomic.Bool{},
freed: make(chan struct{}),
}
q.timer = time.AfterFunc(q.agingTime, func() {
// This func may be invoked twice due to concurrent Reset.
select {
case <-q.closed:
if !q.closed.CompareAndSwap(false, true) {
return
default:
}
if isTest {
time.Sleep(3 * time.Microsecond)
}
p.mu.Lock()
defer p.mu.Unlock()
if p.m[key] == q {
delete(p.m, key)
}
close(q.closed)
// Trigger next loop in func convoy
q.ch <- func() {}
<-q.freed
p.queueChPool.Put(ch)
})
p.m[key] = q
go p.convoy(q)
go q.convoy()
}
p.mu.Unlock()
q.Push(task)
}

var DefaultUdpTaskPool = NewUdpTaskPool()
var (
DefaultUdpTaskPool = NewUdpTaskPool()
ReemitWorkers *ants.Pool
)

func init() {
var err error
ReemitWorkers, err = ants.NewPool(UdpTaskQueueLength/2, ants.WithExpiryDuration(AnyfromTimeout))
if err != nil {
panic(err)
}
}
31 changes: 31 additions & 0 deletions control/udp_task_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: AGPL-3.0-only
* Copyright (c) 2022-2024, daeuniverse Organization <[email protected]>
*/

package control

import (
"testing"
"time"

"github.com/shirou/gopsutil/v4/cpu"
"github.com/stretchr/testify/require"
)

func TestUdpTaskPool(t *testing.T) {
isTest = true
c, err := cpu.Times(false)
require.NoError(t, err)
t.Log(c)
DefaultNatTimeout = 1000 * time.Microsecond
for i := 0; i < 100; i++ {
DefaultUdpTaskPool.EmitTask("testkey", func() {
})
time.Sleep(99 * time.Microsecond)
}
time.Sleep(5 * time.Second)
c, err = cpu.Times(false)
require.NoError(t, err)
t.Log(c)
}
2 changes: 2 additions & 0 deletions docs/en/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ Check them using command like:

> **Note**: `Armbian` users can follow the [**Upgrade Guide**](user-guide/kernel-upgrade.md) to upgrade the kernel to meet the kernel configuration requirement.
> `Arch Linux ARM` users can use [linux-aarch64-7ji](https://github.com/7Ji-PKGBUILDs/linux-aarch64-7ji) which meets the kernel configuration requirement of dae.
## Installation

### Arch Linux / Manjaro
Expand Down
2 changes: 2 additions & 0 deletions docs/zh/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ CONFIG_BPF_EVENTS=y

> **注意**: `Armbian` 用户可以参考 [**Upgrade Guide**](../en/user-guide/kernel-upgrade.md) 升级到支持的内核。
> `Arch Linux ARM` 用户可以使用支持 dae 的 [linux-aarch64-7ji](https://github.com/7Ji-PKGBUILDs/linux-aarch64-7ji) 内核。
## 安装

### Arch Linux / Manjaro
Expand Down
9 changes: 9 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/shirou/gopsutil/v4 v4.24.5
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.7.0
github.com/stretchr/testify v1.9.0
github.com/v2rayA/ahocorasick-domain v0.0.0-20231231085011-99ceb8ef3208
github.com/vishvananda/netlink v1.1.0
github.com/vishvananda/netns v0.0.4
Expand All @@ -37,6 +38,7 @@ require (
github.com/awnumar/memguard v0.19.1 // indirect
github.com/cloudflare/circl v1.3.7 // indirect
github.com/daeuniverse/quic-go v0.0.0-20240413031024-943f218e0810 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
Expand All @@ -45,11 +47,17 @@ require (
github.com/gorilla/websocket v1.5.0 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/klauspost/pgzip v1.2.5 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/nwaples/rardecode v1.1.0 // indirect
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
github.com/panjf2000/ants v1.3.0 // indirect
github.com/pierrec/lz4/v4 v4.1.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/ulikunitz/xz v0.5.9 // indirect
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
Expand All @@ -58,6 +66,7 @@ require (
golang.org/x/net v0.22.0 // indirect
golang.org/x/tools v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require (
Expand Down
Loading

0 comments on commit 30e4a42

Please sign in to comment.