Skip to content

Commit

Permalink
Remove dead processes from the process cache through handle exit even…
Browse files Browse the repository at this point in the history
…ts (#79)
  • Loading branch information
mozillazg authored Jun 29, 2024
1 parent 54f0ecc commit 76e94e8
Show file tree
Hide file tree
Showing 12 changed files with 289 additions and 51 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,12 @@ Filter by Pod:
sudo ptcpdump -i any --pod-name test.default
```

Save/read pcapng file:
Save data in PcapNG format:

```
sudo ptcpdump -i any -w demo.pcapng
sudo ptcpdump -i any -w - port 80 | tcpdump -n -r -
sudo ptcpdump -i any -w - port 80 | tshark -r -
ptcpdump -r demo.pcapng
```

<p align="right"><a href="#top">🔝</a></p>
Expand Down
2 changes: 1 addition & 1 deletion bpf/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

// $TARGET is set by the Makefile
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -no-strip -target $TARGET -type packet_event_t -type exec_event_t -type flow_pid_key_t -type process_meta_t -type packet_event_meta_t Bpf ./ptcpdump.c -- -I./headers -I./headers/$TARGET -I. -Wall
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -no-strip -target $TARGET -type packet_event_t -type exec_event_t -type exit_event_t -type flow_pid_key_t -type process_meta_t -type packet_event_meta_t Bpf ./ptcpdump.c -- -I./headers -I./headers/$TARGET -I. -Wall

const tcFilterName = "ptcpdump"
const logSzie = ebpf.DefaultVerifierLogSize * 32
Expand Down
5 changes: 5 additions & 0 deletions bpf/bpf_arm64_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified bpf/bpf_arm64_bpfel.o
Binary file not shown.
5 changes: 5 additions & 0 deletions bpf/bpf_x86_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified bpf/bpf_x86_bpfel.o
Binary file not shown.
77 changes: 71 additions & 6 deletions bpf/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ type BpfPacketEventWithPayloadT struct {
}

func (b *BPF) PullPacketEvents(ctx context.Context, chanSize int, maxPacketSize int) (<-chan BpfPacketEventWithPayloadT, error) {
perCPUBuffer := os.Getpagesize()
log.Debugf("pagesize is %d", perCPUBuffer)
perCPUBuffer = perCPUBuffer * 64
pageSize := os.Getpagesize()
log.Debugf("pagesize is %d", pageSize)
perCPUBuffer := pageSize * 64
eventSize := int(unsafe.Sizeof(BpfPacketEventT{})) + maxPacketSize
if eventSize >= perCPUBuffer {
perCPUBuffer = perCPUBuffer * (1 + (eventSize / perCPUBuffer))
Expand Down Expand Up @@ -87,15 +87,16 @@ func parsePacketEvent(rawSample []byte) (*BpfPacketEventWithPayloadT, error) {
}

func (b *BPF) PullExecEvents(ctx context.Context, chanSize int) (<-chan BpfExecEventT, error) {
perCPUBuffer := os.Getpagesize()
log.Debugf("pagesize is %d", perCPUBuffer)
pageSize := os.Getpagesize()
log.Debugf("pagesize is %d", pageSize)
perCPUBuffer := pageSize * 64
eventSize := int(unsafe.Sizeof(BpfExecEventT{}))
if eventSize >= perCPUBuffer {
perCPUBuffer = perCPUBuffer * (1 + (eventSize / perCPUBuffer))
}
log.Debugf("use %d as perCPUBuffer", perCPUBuffer)

reader, err := perf.NewReader(b.objs.ExecEvents, 1024*256)
reader, err := perf.NewReader(b.objs.ExecEvents, perCPUBuffer)
if err != nil {
return nil, xerrors.Errorf(": %w", err)
}
Expand Down Expand Up @@ -148,3 +149,67 @@ func parseExecEvent(rawSample []byte) (*BpfExecEventT, error) {
}
return &event, nil
}

func (b *BPF) PullExitEvents(ctx context.Context, chanSize int) (<-chan BpfExitEventT, error) {
pageSize := os.Getpagesize()
log.Debugf("pagesize is %d", pageSize)
perCPUBuffer := pageSize * 4
eventSize := int(unsafe.Sizeof(BpfExitEventT{}))
if eventSize >= perCPUBuffer {
perCPUBuffer = perCPUBuffer * (1 + (eventSize / perCPUBuffer))
}
log.Debugf("use %d as perCPUBuffer", perCPUBuffer)

reader, err := perf.NewReader(b.objs.ExitEvents, perCPUBuffer)
if err != nil {
return nil, xerrors.Errorf(": %w", err)
}
ch := make(chan BpfExitEventT, chanSize)
go func() {
defer close(ch)
defer reader.Close()
b.handleExitEvents(ctx, reader, ch)
}()

return ch, nil
}

func (b *BPF) handleExitEvents(ctx context.Context, reader *perf.Reader, ch chan<- BpfExitEventT) {
for {
select {
case <-ctx.Done():
return
default:
}

record, err := reader.Read()
if err != nil {
if errors.Is(err, perf.ErrClosed) {
return
}
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
log.Debugf("got EOF error: %s", err)
continue
}
log.Errorf("read exit event failed: %s", err)
continue
}
event, err := parseExitEvent(record.RawSample)
if err != nil {
log.Errorf("parse exit event failed: %s", err)
} else {
ch <- *event
}
if record.LostSamples > 0 {
// TODO: XXX
}
}
}

func parseExitEvent(rawSample []byte) (*BpfExitEventT, error) {
event := BpfExitEventT{}
if err := binary.Read(bytes.NewBuffer(rawSample), binary.LittleEndian, &event); err != nil {
return nil, xerrors.Errorf("parse event: %w", err)
}
return &event, nil
}
31 changes: 29 additions & 2 deletions bpf/ptcpdump.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ struct exec_event_t {
char args[EXEC_ARGS_LEN];
};

struct exit_event_t {
u32 pid;
};

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__uint(max_entries, 1);
Expand All @@ -122,6 +126,12 @@ struct {
__uint(value_size, sizeof(u32));
} exec_events SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
__uint(key_size, sizeof(u32));
__uint(value_size, sizeof(u32));
} exit_events SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(max_entries, 65536);
Expand Down Expand Up @@ -177,6 +187,7 @@ const struct packet_event_t *unused1 __attribute__((unused));
const struct exec_event_t *unused2 __attribute__((unused));
const struct flow_pid_key_t *unused3 __attribute__((unused));
const struct process_meta_t *unused4 __attribute__((unused));
const struct exit_event_t *unused5 __attribute__((unused));

static __always_inline int parse_skb_l2(struct __sk_buff *skb, struct l2_t *l2, u32 *offset) {
if (bpf_skb_load_bytes(skb, *offset + offsetof(struct ethhdr, h_proto), &l2->h_protocol, sizeof(l2->h_protocol)) <
Expand Down Expand Up @@ -790,8 +801,11 @@ static __always_inline void handle_tc(struct __sk_buff *skb, bool egress) {
}
event->meta.payload_len = payload_len;

bpf_perf_event_output(skb, &packet_events, BPF_F_CURRENT_CPU | (payload_len << 32), event,
sizeof(struct packet_event_t));
int event_ret = bpf_perf_event_output(skb, &packet_events, BPF_F_CURRENT_CPU | (payload_len << 32), event,
sizeof(struct packet_event_t));
if (event_ret != 0) {
bpf_printk("[ptcpdump] bpf_perf_event_output exec_events failed: %d", event_ret);
}

return;
}
Expand Down Expand Up @@ -848,11 +862,24 @@ static __always_inline void handle_exit(struct bpf_raw_tracepoint_args *ctx) {
// args: struct task_struct *p
struct task_struct *task = (struct task_struct *)BPF_CORE_READ(ctx, args[0]);

atomic_t live = BPF_CORE_READ(task, signal, live);
if (live.counter > 0) {
return;
}

u32 pid = BPF_CORE_READ(task, tgid);
if (bpf_map_lookup_elem(&filter_pid_map, &pid)) {
bpf_map_delete_elem(&filter_pid_map, &pid);
}

struct exit_event_t event = {
.pid = pid,
};
int event_ret = bpf_perf_event_output(ctx, &exit_events, BPF_F_CURRENT_CPU, &event, sizeof(event));
if (event_ret != 0) {
bpf_printk("[ptcpdump] bpf_perf_event_output exit_events failed: %d", event_ret);
}

return;
}

Expand Down
10 changes: 8 additions & 2 deletions cmd/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func capture(ctx context.Context, stop context.CancelFunc, opts Options) error {
fcloser()
}
}()
pcache.Start()
pcache.Start(ctx)

log.Debug("start get current connections")
conns := getCurrentConnects(ctx, pcache, opts)
Expand All @@ -75,9 +75,15 @@ func capture(ctx context.Context, stop context.CancelFunc, opts Options) error {
if err != nil {
return err
}
exitEvensCh, err := bf.PullExitEvents(ctx, int(opts.eventChanSize))
if err != nil {
return err
}

execConsumer := consumer.NewExecEventConsumer(pcache, int(opts.execEventsWorkerNumber))
go execConsumer.Start(ctx, execEvensCh)
exitConsumer := consumer.NewExitEventConsumer(pcache, 10)
go exitConsumer.Start(ctx, exitEvensCh)

var stopByInternal bool
packetConsumer := consumer.NewPacketEventConsumer(writers)
Expand Down Expand Up @@ -119,7 +125,7 @@ func headerTips(opts Options) {
log.Warn("ptcpdump: verbose output suppressed, use -v[v]... for verbose output")
log.Warn(msg)
} else {
log.Warnf("tcpdump: %s", msg)
log.Warnf("ptcpdump: %s", msg)
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func init() {
rootCmd.Flags().UintVar(&opts.eventChanSize, "event-chan-size", 10, "Size of event chan")
rootCmd.Flags().DurationVar(&opts.delayBeforeHandlePacketEvents, "delay-before-handle-packet-events", 0,
"Delay some durations before handle packet events")
rootCmd.Flags().UintVar(&opts.execEventsWorkerNumber, "exec-events-worker-number", 10,
rootCmd.Flags().UintVar(&opts.execEventsWorkerNumber, "exec-events-worker-number", 20,
"Number of worker to handle exec events")
rootCmd.Flags().BoolVar(&opts.oneLine, "oneline", false,
"Print parsed packet output in a single line")
Expand Down
52 changes: 52 additions & 0 deletions internal/consumer/exit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package consumer

import (
"context"
"sync"

"github.com/mozillazg/ptcpdump/bpf"
"github.com/mozillazg/ptcpdump/internal/metadata"
)

type ExitEventConsumer struct {
pcache *metadata.ProcessCache
workerNumber int
}

func NewExitEventConsumer(pcache *metadata.ProcessCache, workerNumber int) *ExitEventConsumer {
return &ExitEventConsumer{
pcache: pcache,
workerNumber: workerNumber,
}
}

func (c *ExitEventConsumer) Start(ctx context.Context, ch <-chan bpf.BpfExitEventT) {
wg := sync.WaitGroup{}
for i := 0; i < c.workerNumber; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c.worker(ctx, ch)
}()
}
wg.Wait()
}

func (c *ExitEventConsumer) worker(ctx context.Context, ch <-chan bpf.BpfExitEventT) {
for {
select {
case <-ctx.Done():
return
case et := <-ch:
c.handleExitEvent(et)
}
}
}

func (c *ExitEventConsumer) handleExitEvent(et bpf.BpfExitEventT) {
c.pcache.MarkDead(int(et.Pid))
}

func (c *ExitEventConsumer) Stop() {

}
Loading

0 comments on commit 76e94e8

Please sign in to comment.