Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add root frame with executable path when available (PROF-9924) #12

Merged
merged 3 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libpf/libpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ type TraceAndCounts struct {
Comm string
PodName string
ContainerName string
PID PID
}

type FrameMetadata struct {
Expand Down
7 changes: 7 additions & 0 deletions libpf/process/coredump.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,13 @@ func (cd *CoredumpProcess) GetMappingFile(_ *Mapping) string {
return ""
}

func (cd *CoredumpProcess) GetExecutablePath() (string, error) {
if cd.MainExecutable() == "" {
return "", errors.New("no main executable found")
}
return cd.MainExecutable(), nil
}

// CalculateMappingFileID implements the Process interface
func (cd *CoredumpProcess) CalculateMappingFileID(m *Mapping) (libpf.FileID, error) {
// It is not possible to calculate the real FileID as the section headers
Expand Down
4 changes: 4 additions & 0 deletions libpf/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func (sp *systemProcess) GetThreads() ([]ThreadInfo, error) {
return nil, errors.New("not implemented")
}

func (sp *systemProcess) GetExecutablePath() (string, error) {
return os.Readlink(fmt.Sprintf("/proc/%d/exe", sp.pid))
}

func (sp *systemProcess) Close() error {
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions libpf/process/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ type Process interface {
// GetMapping reads and parses process memory mappings
GetMappings() ([]Mapping, error)

// GetExecutablePath returns the path to the executable of the process
GetExecutablePath() (string, error)

// GetThread reads the process thread states
GetThreads() ([]ThreadInfo, error)

Expand Down
2 changes: 2 additions & 0 deletions processmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func (d *dummyProcess) GetMappingFile(_ *process.Mapping) string {
return ""
}

func (d *dummyProcess) GetExecutablePath() (string, error) { return "", nil }

func (d *dummyProcess) CalculateMappingFileID(m *process.Mapping) (libpf.FileID, error) {
return pfelf.CalculateID(m.Path)
}
Expand Down
3 changes: 3 additions & 0 deletions processmanager/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,9 @@ func (pm *ProcessManager) SynchronizeProcess(pr process.Process) {
// Also see: Unified PID Events design doc
pm.ebpf.RemoveReportedPID(pid)
}

execPath, _ := pr.GetExecutablePath()
pm.reporter.ProcessMetadata(context.TODO(), pid, execPath)
}

// CleanupPIDs executes a periodic synchronization of pidToProcessInfo table with system processes.
Expand Down
40 changes: 39 additions & 1 deletion reporter/datadog_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ package reporter
import (
"bytes"
"context"
"fmt"
"net/url"
"os"
"path"
"strings"
"time"

Expand Down Expand Up @@ -66,6 +68,9 @@ type DatadogReporter struct {

// frames maps frame information to its source location.
frames *lru.SyncedLRU[libpf.FileID, map[libpf.AddressOrLineno]sourceInfo]

// execPathes stores the last known execPath for a PID.
execPathes *lru.SyncedLRU[libpf.PID, string]
}

// ReportFramesForTrace accepts a trace with the corresponding frames
Expand Down Expand Up @@ -94,7 +99,7 @@ func (r *DatadogReporter) ReportFramesForTrace(trace *libpf.Trace) {
// caches this information.
// nolint: dupl
func (r *DatadogReporter) ReportCountForTrace(traceHash libpf.TraceHash, timestamp libpf.UnixTime32,
count uint16, comm, podName, containerName string) {
count uint16, comm, podName, containerName string, pid libpf.PID) {
if v, exists := r.traces.Peek(traceHash); exists {
// As traces is filled from two different API endpoints,
// some information for the trace might be available already.
Expand All @@ -103,13 +108,15 @@ func (r *DatadogReporter) ReportCountForTrace(traceHash libpf.TraceHash, timesta
v.comm = comm
v.podName = podName
v.containerName = containerName
v.pid = pid

r.traces.Add(traceHash, v)
} else {
r.traces.Add(traceHash, traceInfo{
comm: comm,
podName: podName,
containerName: containerName,
pid: pid,
})
}

Expand Down Expand Up @@ -144,6 +151,10 @@ func (r *DatadogReporter) ExecutableMetadata(_ context.Context,
})
}

func (r *DatadogReporter) ProcessMetadata(_ context.Context, pid libpf.PID, execPath string) {
r.execPathes.Add(pid, execPath)
}

// FrameMetadata accepts metadata associated with a frame and caches this information.
func (r *DatadogReporter) FrameMetadata(fileID libpf.FileID, addressOrLine libpf.AddressOrLineno,
lineNumber libpf.SourceLineno, functionOffset uint32, functionName, filePath string) {
Expand Down Expand Up @@ -241,6 +252,11 @@ func StartDatadog(mainCtx context.Context, c *Config) (Reporter, error) {
return nil, err
}

execPathes, err := lru.NewSynced[libpf.PID, string](cacheSize, libpf.PID.Hash32)
if err != nil {
return nil, err
}

// Next step: Dynamically configure the size of this LRU.
// Currently we use the length of the JSON array in
// hostmetadata/hostmetadata.json.
Expand All @@ -261,6 +277,7 @@ func StartDatadog(mainCtx context.Context, c *Config) (Reporter, error) {
executables: executables,
frames: frames,
hostmetadata: hostmetadata,
execPathes: execPathes,
}

// Create a child context for reporting features
Expand Down Expand Up @@ -511,6 +528,23 @@ func (r *DatadogReporter) getPprofProfile() (profile *pprofile.Profile,
sample.Location = append(sample.Location, loc)
}

execPath, _ := r.execPathes.Get(trace.pid)

// Check if the last frame is a kernel frame.
if trace.frameTypes[len(trace.frameTypes)-1] == libpf.KernelFrame {
// If the last frame is a kernel frame, we need to add a dummy
// location with the kernel as the function name.
execPath = "kernel"
}

if execPath != "" {
base := path.Base(execPath)
loc := createPProfLocation(profile, 0)
m := createPprofFunctionEntry(funcMap, profile, base, execPath)
loc.Line = append(loc.Line, pprofile.Line{Function: m})
sample.Location = append(sample.Location, loc)
}

sample.Label = make(map[string][]string)
addTraceLabels(sample.Label, trace)

Expand Down Expand Up @@ -568,6 +602,10 @@ func addTraceLabels(labels map[string][]string, i traceInfo) {
if i.apmServiceName != "" {
labels["apmServiceName"] = append(labels["apmServiceName"], i.apmServiceName)
}

if i.pid != 0 {
labels["process_id"] = append(labels["process_id"], fmt.Sprintf("%d", i.pid))
}
}

// getDummyMappingIndex inserts or looks up a dummy entry for interpreted FileIDs.
Expand Down
6 changes: 5 additions & 1 deletion reporter/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type TraceReporter interface {
// ReportCountForTrace accepts a hash of a trace with a corresponding count and
// caches this information before a periodic reporting to the backend.
ReportCountForTrace(traceHash libpf.TraceHash, timestamp libpf.UnixTime32,
count uint16, comm, podName, containerName string)
count uint16, comm, podName, containerName string, pid libpf.PID)
}

type SymbolReporter interface {
Expand All @@ -63,6 +63,10 @@ type SymbolReporter interface {
// a periodic reporting to the backend.
FrameMetadata(fileID libpf.FileID, addressOrLine libpf.AddressOrLineno,
lineNumber libpf.SourceLineno, functionOffset uint32, functionName, filePath string)

// ProcessMetadata accepts metadata associated with a process and caches this information
// before a periodic reporting to the backend.
ProcessMetadata(ctx context.Context, pid libpf.PID, exe string)
}

type HostMetadataReporter interface {
Expand Down
22 changes: 21 additions & 1 deletion reporter/otlp_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type traceInfo struct {
podName string
containerName string
apmServiceName string
pid libpf.PID
}

// sample holds dynamic information about traces.
Expand Down Expand Up @@ -101,6 +102,9 @@ type OTLPReporter struct {

// frames maps frame information to its source location.
frames *lru.SyncedLRU[libpf.FileID, map[libpf.AddressOrLineno]sourceInfo]

// execPathes stores the last known execPath for a PID.
execPathes *lru.SyncedLRU[libpf.PID, string]
}

// hashString is a helper function for LRUs that use string as a key.
Expand Down Expand Up @@ -136,7 +140,7 @@ func (r *OTLPReporter) ReportFramesForTrace(trace *libpf.Trace) {
// caches this information.
// nolint: dupl
func (r *OTLPReporter) ReportCountForTrace(traceHash libpf.TraceHash, timestamp libpf.UnixTime32,
count uint16, comm, podName, containerName string) {
count uint16, comm, podName, containerName string, pid libpf.PID) {
if v, exists := r.traces.Peek(traceHash); exists {
// As traces is filled from two different API endpoints,
// some information for the trace might be available already.
Expand All @@ -145,13 +149,15 @@ func (r *OTLPReporter) ReportCountForTrace(traceHash libpf.TraceHash, timestamp
v.comm = comm
v.podName = podName
v.containerName = containerName
v.pid = pid

r.traces.Add(traceHash, v)
} else {
r.traces.Add(traceHash, traceInfo{
comm: comm,
podName: podName,
containerName: containerName,
pid: pid,
})
}

Expand Down Expand Up @@ -186,6 +192,10 @@ func (r *OTLPReporter) ExecutableMetadata(_ context.Context,
})
}

func (r *OTLPReporter) ProcessMetadata(_ context.Context, pid libpf.PID, execPath string) {
r.execPathes.Add(pid, execPath)
}

// FrameMetadata accepts metadata associated with a frame and caches this information.
func (r *OTLPReporter) FrameMetadata(fileID libpf.FileID, addressOrLine libpf.AddressOrLineno,
lineNumber libpf.SourceLineno, functionOffset uint32, functionName, filePath string) {
Expand Down Expand Up @@ -732,6 +742,16 @@ func getTraceLabels(stringMap map[string]uint32, i traceInfo) []*pprofextended.L
})
}

if i.pid != 0 {
pidIdx := getStringMapIndex(stringMap, "process_id")
pidValueIdx := getStringMapIndex(stringMap, fmt.Sprintf("%d", i.pid))

labels = append(labels, &pprofextended.Label{
Gandem marked this conversation as resolved.
Show resolved Hide resolved
Key: int64(pidIdx),
Str: int64(pidValueIdx),
})
}

return labels
}

Expand Down
17 changes: 16 additions & 1 deletion reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type GRPCReporter struct {
hostMetadataQueue fifoRingBuffer[*HostMetadata]
// fallbackSymbolsQueue is a ring buffer based FIFO for *fallbackSymbol
fallbackSymbolsQueue fifoRingBuffer[*fallbackSymbol]
// execPathesQueue stores the last known execPath for a PID.
execPathesQueue fifoRingBuffer[*processMetadata]
}

// Assert that we implement the full Reporter interface.
Expand All @@ -109,6 +111,11 @@ type executableMetadata struct {
buildID string
}

type processMetadata struct {
pid libpf.PID
execPath string
}

// ExecutableMetadata implements the SymbolReporter interface.
func (r *GRPCReporter) ExecutableMetadata(ctx context.Context, fileID libpf.FileID,
fileName, buildID string) {
Expand All @@ -124,6 +131,13 @@ func (r *GRPCReporter) ExecutableMetadata(ctx context.Context, fileID libpf.File
}
}

func (r *GRPCReporter) ProcessMetadata(_ context.Context, pid libpf.PID, execPath string) {
r.execPathesQueue.append(&processMetadata{
pid: pid,
execPath: execPath,
})
}

// FrameMetadata implements the SymbolReporter interface.
func (r *GRPCReporter) FrameMetadata(fileID libpf.FileID,
addressOrLine libpf.AddressOrLineno, lineNumber libpf.SourceLineno, functionOffset uint32,
Expand All @@ -140,14 +154,15 @@ func (r *GRPCReporter) FrameMetadata(fileID libpf.FileID,

// ReportCountForTrace implements the TraceReporter interface.
func (r *GRPCReporter) ReportCountForTrace(traceHash libpf.TraceHash, timestamp libpf.UnixTime32,
count uint16, comm, podName, containerName string) {
count uint16, comm, podName, containerName string, pid libpf.PID) {
r.countsForTracesQueue.append(&libpf.TraceAndCounts{
Hash: traceHash,
Timestamp: timestamp,
Count: count,
Comm: comm,
PodName: podName,
ContainerName: containerName,
PID: pid,
})
}

Expand Down
4 changes: 2 additions & 2 deletions tracehandler/tracehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) {
if traceKnown {
m.bpfTraceCacheHit++
m.reporter.ReportCountForTrace(postConvHash, timestamp, 1,
bpfTrace.Comm, meta.PodName, meta.ContainerName)
bpfTrace.Comm, meta.PodName, meta.ContainerName, bpfTrace.PID)
return
}
m.bpfTraceCacheMiss++
Expand All @@ -155,7 +155,7 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) {
log.Debugf("Trace hash remap 0x%x -> 0x%x", bpfTrace.Hash, umTrace.Hash)
m.bpfTraceCache.Add(bpfTrace.Hash, umTrace.Hash)
m.reporter.ReportCountForTrace(umTrace.Hash, timestamp, 1,
bpfTrace.Comm, meta.PodName, meta.ContainerName)
bpfTrace.Comm, meta.PodName, meta.ContainerName, bpfTrace.PID)

// Trace already known to collector by UM hash?
if _, known := m.umTraceCache.Get(umTrace.Hash); known {
Expand Down
2 changes: 1 addition & 1 deletion tracehandler/tracehandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (m *mockReporter) ReportFramesForTrace(trace *libpf.Trace) {
}

func (m *mockReporter) ReportCountForTrace(traceHash libpf.TraceHash,
_ libpf.UnixTime32, count uint16, _, _, _ string) {
_ libpf.UnixTime32, count uint16, _, _, _ string, _ libpf.PID) {
m.reportedCounts = append(m.reportedCounts, reportedCount{
traceHash: traceHash,
count: count,
Expand Down
2 changes: 2 additions & 0 deletions utils/coredump/coredump.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func (c *symbolizationCache) FrameMetadata(fileID libpf.FileID,

func (c *symbolizationCache) ReportFallbackSymbol(libpf.FrameID, string) {}

func (c *symbolizationCache) ProcessMetadata(_ context.Context, _ libpf.PID, _ string) {}

func generateErrorMap() (map[libpf.AddressOrLineno]string, error) {
file, err := os.Open("../errors-codegen/errors.json")
if err != nil {
Expand Down
Loading