Skip to content

Commit

Permalink
change to use perf_event for exec events
Browse files Browse the repository at this point in the history
  • Loading branch information
mozillazg committed Apr 28, 2024
1 parent 680f2d2 commit 24080e1
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 115 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Please download the latest binary in the [releases](https://github.com/mozillazg

### Requirements

Linux kernel version must be larger than 5.7.
Linux kernel version must be larger than 4.18.


## Usage
Expand Down
18 changes: 0 additions & 18 deletions bpf/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"
"github.com/florianl/go-tc"
"github.com/florianl/go-tc/core"
"github.com/jschwinger233/elibpcap"
Expand Down Expand Up @@ -198,22 +196,6 @@ func (b *BPF) AttachTcHooks(ifindex int, egress, ingress bool) error {
return nil
}

func (b *BPF) NewPacketEventReader() (*perf.Reader, error) {
reader, err := perf.NewReader(b.objs.PacketEvents, 1500*1000)
if err != nil {
return nil, xerrors.Errorf(": %w", err)
}
return reader, nil
}

func (b *BPF) NewExecEventReader() (*ringbuf.Reader, error) {
reader, err := ringbuf.NewReader(b.objs.ExecEvents)
if err != nil {
return nil, xerrors.Errorf(": %w", err)
}
return reader, nil
}

func (o Options) attachForks() bool {
return o.FollowForks == 1
}
Expand Down
25 changes: 14 additions & 11 deletions bpf/bpf_x86_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified bpf/bpf_x86_bpfel.o
Binary file not shown.
115 changes: 115 additions & 0 deletions bpf/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package bpf

import (
"bytes"
"context"
"encoding/binary"
"errors"
"github.com/cilium/ebpf/perf"
"golang.org/x/xerrors"
"log"
"unsafe"
)

func (b *BPF) PullPacketEvents(ctx context.Context) (<-chan BpfPacketEventT, error) {
reader, err := perf.NewReader(b.objs.PacketEvents, 1500*1000)
if err != nil {
return nil, xerrors.Errorf(": %w", err)
}
ch := make(chan BpfPacketEventT, 10)
go func() {
defer close(ch)
defer reader.Close()
b.handlePacketEvents(ctx, reader, ch)
}()

return ch, nil
}

func (b *BPF) handlePacketEvents(ctx context.Context, reader *perf.Reader, ch chan<- BpfPacketEventT) {
for {
select {
case <-ctx.Done():
return
default:
}

record, err := reader.Read()
if err != nil {
if errors.Is(err, perf.ErrClosed) {
return
}
log.Printf("read packet event failed: %s", err)
continue
}
event, err := parsePacketEvent(record.RawSample)
if err != nil {
log.Printf("parse packet event failed: %s", err)
} else {
ch <- *event
}
if record.LostSamples > 0 {
// TODO: XXX
}
}
}

func parsePacketEvent(rawSample []byte) (*BpfPacketEventT, error) {
event := BpfPacketEventT{}
if err := binary.Read(bytes.NewBuffer(rawSample), binary.LittleEndian, &event.Meta); err != nil {
return nil, xerrors.Errorf("parse meta: %w", err)
}
copy(event.Payload[:], rawSample[unsafe.Offsetof(event.Payload):])
return &event, nil
}

func (b *BPF) PullExecEvents(ctx context.Context) (<-chan BpfExecEventT, error) {
reader, err := perf.NewReader(b.objs.ExecEvents, 1024*256)
if err != nil {
return nil, xerrors.Errorf(": %w", err)
}
ch := make(chan BpfExecEventT, 10)
go func() {
defer close(ch)
defer reader.Close()
b.handleExecEvents(ctx, reader, ch)
}()

return ch, nil
}

func (b *BPF) handleExecEvents(ctx context.Context, reader *perf.Reader, ch chan<- BpfExecEventT) {
for {
select {
case <-ctx.Done():
return
default:
}

record, err := reader.Read()
if err != nil {
if errors.Is(err, perf.ErrClosed) {
return
}
log.Printf("read exec event failed: %s", err)
continue
}
event, err := parseExecEvent(record.RawSample)
if err != nil {
log.Printf("parse exec event failed: %s", err)
} else {
ch <- *event
}
if record.LostSamples > 0 {
// TODO: XXX
}
}
}

func parseExecEvent(rawSample []byte) (*BpfExecEventT, error) {
event := BpfExecEventT{}
if err := binary.Read(bytes.NewBuffer(rawSample), binary.LittleEndian, &event); err != nil {
return nil, xerrors.Errorf("parse event: %w", err)
}
return &event, nil
}
35 changes: 23 additions & 12 deletions bpf/ptcpdump.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
static volatile const u32 filter_pid = 0;
static volatile const u8 filter_follow_forks = 0;
volatile const char filter_comm[TASK_COMM_LEN];
static const u8 u8_zero = 0;
static const u32 u32_zero = 0;

char _license[] SEC("license") = "Dual MIT/GPL";

Expand Down Expand Up @@ -98,8 +100,16 @@ struct exec_event_t {
};

struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1024 * 512);
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__uint(max_entries, 1);
__type(key, u32);
__type(value, struct exec_event_t);
} exec_event_stack SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
__uint(key_size, sizeof(u32));
__uint(value_size, sizeof(u32));
} exec_events SEC(".maps");

struct {
Expand All @@ -114,7 +124,7 @@ struct {
__uint(max_entries, 1);
__type(key, u32);
__type(value, struct packet_event_t);
} bpf_stack SEC(".maps");
} packet_event_stack SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
Expand Down Expand Up @@ -334,8 +344,7 @@ static __always_inline int process_filter(struct task_struct *task) {
}

if (should_filter) {
u8 zero = 0;
bpf_map_update_elem(&filter_pid_map, &pid, &zero, BPF_NOEXIST);
bpf_map_update_elem(&filter_pid_map, &pid, &u8_zero, BPF_NOEXIST);
}

return 0;
Expand Down Expand Up @@ -363,8 +372,7 @@ static __always_inline void handle_fork(struct trace_event_raw_sys_exit *ctx) {
}
}
if (should_filter) {
u8 zero = 0;
bpf_map_update_elem(&filter_pid_map, &child_pid, &zero, BPF_NOEXIST);
bpf_map_update_elem(&filter_pid_map, &child_pid, &u8_zero, BPF_NOEXIST);
// bpf_printk("handle fork: %d", child_pid);
}
return;
Expand Down Expand Up @@ -469,9 +477,9 @@ static __always_inline void handle_tc(struct __sk_buff *skb, bool egress) {
}

struct packet_event_t *event;
u32 zero = 0;
event = bpf_map_lookup_elem(&bpf_stack, &zero);
event = bpf_map_lookup_elem(&packet_event_stack, &u32_zero);
if (!event) {
bpf_printk("[ptcpdump] packet_event_stack failed");
return;
}
// __builtin_memset(event, 0, sizeof(*event));
Expand Down Expand Up @@ -504,9 +512,9 @@ static __always_inline void handle_exec(struct trace_event_raw_sched_process_exe
}

struct exec_event_t *event;
event = bpf_ringbuf_reserve(&exec_events, sizeof(*event), 0);
event = bpf_map_lookup_elem(&exec_event_stack, &u32_zero);
if (!event) {
bpf_printk("[ptcpdump] bpf_ringbuf_reserve failed");
bpf_printk("[ptcpdump] exec_event_stack failed");
return;
}

Expand Down Expand Up @@ -539,7 +547,10 @@ static __always_inline void handle_exec(struct trace_event_raw_sched_process_exe
event->args_size = arg_length;
}

bpf_ringbuf_submit(event, 0);
int event_ret = bpf_perf_event_output(ctx, &exec_events, BPF_F_CURRENT_CPU, event, sizeof(*event));
if (event_ret != 0) {
bpf_printk("[ptcpdump] bpf_perf_event_output exec_events failed: %d", event_ret);
}
return;
}

Expand Down
20 changes: 9 additions & 11 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,27 +92,25 @@ func run(cmd *cobra.Command, args []string) error {
}
defer bf.Close()

packetEventReader, err := bf.NewPacketEventReader()
ctx, stop := signal.NotifyContext(
context.Background(), syscall.SIGINT, syscall.SIGTERM,
)
defer stop()

packetEvensCh, err := bf.PullPacketEvents(ctx)
if err != nil {
return err
}
defer packetEventReader.Close()
execEventReader, err := bf.NewExecEventReader()
execEvensCh, err := bf.PullExecEvents(ctx)
if err != nil {
return err
}
defer execEventReader.Close()

ctx, stop := signal.NotifyContext(
context.Background(), syscall.SIGINT, syscall.SIGTERM,
)
defer stop()

execConsumer := consumer.NewExecEventConsumer(pcache)
go execConsumer.Start(ctx, execEventReader)
go execConsumer.Start(ctx, execEvensCh)
packetConsumer := consumer.NewPacketEventConsumer(writers, devices)
go func() {
packetConsumer.Start(ctx, packetEventReader, opts.maxPacketCount)
packetConsumer.Start(ctx, packetEvensCh, opts.maxPacketCount)
stop()
}()

Expand Down
24 changes: 6 additions & 18 deletions internal/consumer/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package consumer

import (
"context"
"errors"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"
"github.com/mozillazg/ptcpdump/bpf"
"github.com/mozillazg/ptcpdump/internal/event"
"github.com/mozillazg/ptcpdump/internal/metadata"
"log"
Expand All @@ -20,29 +18,19 @@ func NewExecEventConsumer(pcache *metadata.ProcessCache) *ExecEventConsumer {
}
}

func (c *ExecEventConsumer) Start(ctx context.Context, reader *ringbuf.Reader) {
func (c *ExecEventConsumer) Start(ctx context.Context, ch <-chan bpf.BpfExecEventT) {
for {
select {
case <-ctx.Done():
return
default:
case et := <-ch:
c.parseExecEvent(et)
}

record, err := reader.Read()
if err != nil {
if errors.Is(err, perf.ErrClosed) {
log.Println("[ExecEventConsumer] Received signal, exiting...")
return
}
log.Printf("[ExecEventConsumer] read event failed: %s", err)
continue
}
c.parseExecEvent(record.RawSample)
}
}

func (c *ExecEventConsumer) parseExecEvent(rawSample []byte) {
e, err := event.ParseProcessExecEvent(rawSample)
func (c *ExecEventConsumer) parseExecEvent(et bpf.BpfExecEventT) {
e, err := event.ParseProcessExecEvent(et)
if err != nil {
log.Printf("[ExecEventConsumer] parse event failed: %s", err)
return
Expand Down
Loading

0 comments on commit 24080e1

Please sign in to comment.