Skip to content

Commit

Permalink
refactor: Export cgroup paths in targets
Browse files Browse the repository at this point in the history
* This is a simplified approach to the original idea. Here we export cgroup paths as one of the labels and users can use relabel component to retrieve the relevant cgroup IDs.

* In the case of cgroups v1, we export all the controllers paths delimited by `|` where as in cgroups v2, there is always one path

Signed-off-by: Mahendra Paipuri <[email protected]>
  • Loading branch information
mahendrapaipuri committed Oct 11, 2024
1 parent 516c1f1 commit 1302357
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 150 deletions.
30 changes: 3 additions & 27 deletions docs/sources/reference/components/discovery/discovery.process.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ The following arguments are supported:
| Name | Type | Description | Default | Required |
|--------------------|---------------------|------------------------------------------------------------------------------------------|---------|----------|
| `join` | `list(map(string))` | Join external targets to discovered processes targets based on `__container_id__` label. | | no |
| `cgroup_id_regex` | `string` | Regular expression that captures only one group that will used as cgroup ID of process. | | no |
| `refresh_interval` | `duration` | How often to sync targets. | "60s" | no |

### Targets joining
Expand Down Expand Up @@ -88,11 +87,6 @@ The resulting targets are:
]
```

### Cgroup ID regex

The `cgroup_id_regex` argument allows you to capture the cgroup ID of the process managed by other resource managers like SLURM or Libvirt.
When it is enabled, you can export the cgroup ID as the `__meta_process_cgroup_id` field.

## Blocks

The following blocks are supported inside the definition of
Expand All @@ -117,7 +111,7 @@ The following arguments are supported:
| `commandline` | `bool` | A flag to enable discovering `__meta_process_commandline` label. | true | no |
| `uid` | `bool` | A flag to enable discovering `__meta_process_uid`: label. | true | no |
| `username` | `bool` | A flag to enable discovering `__meta_process_username`: label. | true | no |
| `cgroup_id` | `bool` | A flag to enable discovering `__meta_cgroup_id__` label. | true | no |
| `cgroup_path` | `bool` | A flag to enable discovering `__meta_cgroup_path__` label. | true | no |
| `container_id` | `bool` | A flag to enable discovering `__container_id__` label. | true | no |

## Exported fields
Expand All @@ -136,7 +130,7 @@ Each target includes the following labels:
* `__meta_process_commandline`: The process command line. Taken from `/proc/<pid>/cmdline`.
* `__meta_process_uid`: The process UID. Taken from `/proc/<pid>/status`.
* `__meta_process_username`: The process username. Taken from `__meta_process_uid` and `os/user/LookupID`.
* `__meta_cgroup_id`: The cgroup ID under which the process is running. This will be set only when `cgroup_regex_id` argument is passed and valid.
* `__meta_cgroup_path`: The cgroup path under which the process is running. In the case of cgroups v1, this label includes all the controllers paths delimited by `|`.
* `__container_id__`: The container ID. Taken from `/proc/<pid>/cgroup`. If the process is not running in a container, this label is not set.

## Component health
Expand Down Expand Up @@ -165,25 +159,7 @@ discovery.process "all" {
commandline = true
username = true
uid = true
container_id = true
}
}
```

### Example discovering processes on a hypervisor managed by libvirt

```alloy
discovery.process "all" {
cgroup_id_regex = "^.*/(?:.+?)instance-([0-9]+)(?:.*$)"
refresh_interval = "60s"
discover_config {
cwd = true
exe = true
commandline = true
username = true
uid = true
cgroup_id = true
cgroup_path = true
container_id = true
}
}
Expand Down
5 changes: 2 additions & 3 deletions internal/component/discovery/process/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
type Arguments struct {
Join []discovery.Target `alloy:"join,attr,optional"`
RefreshInterval time.Duration `alloy:"refresh_interval,attr,optional"`
CgroupIDRegex string `alloy:"cgroup_id_regex,attr,optional"`
DiscoverConfig DiscoverConfig `alloy:"discover_config,block,optional"`
}

Expand All @@ -20,7 +19,7 @@ type DiscoverConfig struct {
Username bool `alloy:"username,attr,optional"`
UID bool `alloy:"uid,attr,optional"`
ContainerID bool `alloy:"container_id,attr,optional"`
CgroupID bool `alloy:"cgroup_id,attr,optional"`
CgroupPath bool `alloy:"cgroup_path,attr,optional"`
}

var DefaultConfig = Arguments{
Expand All @@ -31,7 +30,7 @@ var DefaultConfig = Arguments{
Exe: true,
Commandline: true,
ContainerID: true,
CgroupID: true,
CgroupPath: true,
},
}

Expand Down
24 changes: 12 additions & 12 deletions internal/component/discovery/process/cgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@ package process
import (
"bufio"
"io"
"regexp"
"strings"
)

func getIDFromCGroup(cgroup io.Reader, regexp *regexp.Regexp) string {
if regexp == nil {
return ""
}

// getPathFromCGroup fetches cgroup path(s) from process.
// In the case of cgroups v2 (unified), there will be only
// one path and function returns that path. In the case
// cgroups v1, there will be one path for each controller.
// The function will join all the paths using `|` and
// returns as one string. Users can use relabel component
// to retrieve the path that they are interested.
func getPathFromCGroup(cgroup io.Reader) string {
var paths []string
scanner := bufio.NewScanner(cgroup)
for scanner.Scan() {
line := scanner.Bytes()
matches := regexp.FindSubmatch(line)
if len(matches) <= 1 {
continue
}
return string(matches[1])
paths = append(paths, string(line))
}
return ""
return strings.Join(paths, "|")
}
87 changes: 24 additions & 63 deletions internal/component/discovery/process/cgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,87 +5,48 @@ package process
import (
"bytes"
"fmt"
"regexp"
"testing"
"time"

"github.com/grafana/alloy/internal/runtime/componenttest"
"github.com/stretchr/testify/require"
)

func TestGenericCGroupMatching(t *testing.T) {
type testcase = struct {
regex *regexp.Regexp
cgroup, expectedID string
name, cgroup, expectedPath string
}
testcases := []testcase{
{
regex: regexp.MustCompile("^.*/(?:.+?)job_([0-9]+)(?:.*$)"),
cgroup: "0::/system.slice/slurmstepd.scope/job_1446354/step_batch/user/task_0", // SLURM with cgroups v2
expectedID: "1446354",
name: "cgroups v2",
cgroup: `0::/system.slice/slurmstepd.scope/job_1446354/step_batch/user/task_0`, // cgroups v2
expectedPath: `0::/system.slice/slurmstepd.scope/job_1446354/step_batch/user/task_0`,
},
{
regex: regexp.MustCompile("^.*/(?:.+?)job_([0-9]+)(?:.*$)"),
cgroup: "6:cpuset:/slurm/uid_100/job_1446355", // SLURM with cgroups v1
expectedID: "1446355",
name: "cgroups v1",
cgroup: `12:rdma:/
11:devices:/machine/qemu-1-instance-00000025.libvirt-qemu/emulator
10:cpuset:/machine/qemu-1-instance-00000025.libvirt-qemu/emulator
9:blkio:/machine/qemu-1-instance-00000025.libvirt-qemu/emulator
8:pids:/user.slice/user-118.slice/session-5.scope
7:memory:/machine/qemu-1-instance-00000025.libvirt-qemu/emulator
6:hugetlb:/
5:net_cls,net_prio:/
4:perf_event:/
3:cpu,cpuacct:/machine/qemu-1-instance-00000025.libvirt-qemu/emulator
2:freezer:/machine/qemu-1-instance-00000025.libvirt-qemu/emulator
1:name=systemd:/user.slice/user-118.slice/session-5.scope`, // cgroups v1
expectedPath: "12:rdma:/|11:devices:/machine/qemu-1-instance-00000025.libvirt-qemu/emulator|10:cpuset:/machine/qemu-1-instance-00000025.libvirt-qemu/emulator|9:blkio:/machine/qemu-1-instance-00000025.libvirt-qemu/emulator|8:pids:/user.slice/user-118.slice/session-5.scope|7:memory:/machine/qemu-1-instance-00000025.libvirt-qemu/emulator|6:hugetlb:/|5:net_cls,net_prio:/|4:perf_event:/|3:cpu,cpuacct:/machine/qemu-1-instance-00000025.libvirt-qemu/emulator|2:freezer:/machine/qemu-1-instance-00000025.libvirt-qemu/emulator|1:name=systemd:/user.slice/user-118.slice/session-5.scope",
},
{
regex: regexp.MustCompile("^.*/(?:.+?)instance-([0-9]+)(?:.*$)"),
cgroup: "0::/machine/qemu-1-instance-00000025.libvirt-qemu/emulator", // Openstack with libvirt
expectedID: "00000025",
},
{
regex: regexp.MustCompile("^.*/docker/([a-z0-9]+)(?:.*$)"),
cgroup: "4:pids:/docker/18c8e093ee0e02ce1ecee4e99590675594c72c4c8b59a7619bc79fc64ddc2fd9", // Docker
expectedID: "18c8e093ee0e02ce1ecee4e99590675594c72c4c8b59a7619bc79fc64ddc2fd9",
},
{
regex: nil,
cgroup: "4:pids:/docker/18c8e093ee0e02ce1ecee4e99590675594c72c4c8b59a7619bc79fc64ddc2fd9",
expectedID: "",
name: "empty cgroups path", // Should not happen in real cases
cgroup: "",
expectedPath: "",
},
}
for i, tc := range testcases {
t.Run(fmt.Sprintf("testcase %d %s", i, tc.cgroup), func(t *testing.T) {
cgroupID := getIDFromCGroup(bytes.NewReader([]byte(tc.cgroup)), tc.regex)
expected := tc.expectedID
t.Run(fmt.Sprintf("testcase %d %s", i, tc.name), func(t *testing.T) {
cgroupID := getPathFromCGroup(bytes.NewReader([]byte(tc.cgroup)))
expected := tc.expectedPath
require.Equal(t, expected, cgroupID)
})
}
}

func TestProcessUpdateSuccess(t *testing.T) {
var args = DefaultConfig

tc, err := componenttest.NewControllerFromID(nil, "discovery.process")
require.NoError(t, err)
go func() {
err = tc.Run(componenttest.TestContext(t), args)
require.NoError(t, err)
}()

// Sleep a short time for component to go into run state
time.Sleep(100 * time.Millisecond)

newArgs := args
newArgs.CgroupIDRegex = "^.*/docker/([a-z0-9]+)(?:.*$)"
require.NoError(t, tc.Update(newArgs))
}

func TestProcessUpdateFail(t *testing.T) {
var args = DefaultConfig
args.CgroupIDRegex = "^.*/docker/([a-z0-9]+)(?:.*$)"

tc, err := componenttest.NewControllerFromID(nil, "discovery.process")
require.NoError(t, err)
go func() {
err = tc.Run(componenttest.TestContext(t), args)
require.NoError(t, err)
}()

time.Sleep(100 * time.Millisecond)

newArgs := args
newArgs.CgroupIDRegex = `[z-a]` // Invalid regex
require.Error(t, tc.Update(newArgs))
}
29 changes: 14 additions & 15 deletions internal/component/discovery/process/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"os/user"
"path"
"regexp"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -24,7 +23,7 @@ const (
labelProcessCommandline = "__meta_process_commandline"
labelProcessUsername = "__meta_process_username"
labelProcessUID = "__meta_process_uid"
labelProcessCgroupID = "__meta_process_cgroup_id"
labelProcessCgroupPath = "__meta_process_cgroup_path"
labelProcessContainerID = "__container_id__"
)

Expand All @@ -34,13 +33,13 @@ type process struct {
cwd string
commandline string
containerID string
cgroupID string
cgroupPath string
username string
uid string
}

func (p process) String() string {
return fmt.Sprintf("pid=%s exe=%s cwd=%s commandline=%s cgroupID=%s containerID=%s", p.pid, p.exe, p.cwd, p.commandline, p.cgroupID, p.containerID)
return fmt.Sprintf("pid=%s exe=%s cwd=%s commandline=%s cgrouppath=%s containerID=%s", p.pid, p.exe, p.cwd, p.commandline, p.cgroupPath, p.containerID)
}

func convertProcesses(ps []process) []discovery.Target {
Expand All @@ -53,7 +52,7 @@ func convertProcesses(ps []process) []discovery.Target {
}

func convertProcess(p process) discovery.Target {
t := make(discovery.Target, 5)
t := make(discovery.Target, 8)
t[labelProcessID] = p.pid
if p.exe != "" {
t[labelProcessExe] = p.exe
Expand All @@ -73,13 +72,13 @@ func convertProcess(p process) discovery.Target {
if p.uid != "" {
t[labelProcessUID] = p.uid
}
if p.cgroupID != "" {
t[labelProcessCgroupID] = p.cgroupID
if p.cgroupPath != "" {
t[labelProcessCgroupPath] = p.cgroupPath
}
return t
}

func discover(l log.Logger, cfg *DiscoverConfig, cgroupIDRegexp *regexp.Regexp) ([]process, error) {
func discover(l log.Logger, cfg *DiscoverConfig) ([]process, error) {
processes, err := gopsutil.Processes()
if err != nil {
return nil, fmt.Errorf("failed to list processes: %w", err)
Expand All @@ -97,7 +96,7 @@ func discover(l log.Logger, cfg *DiscoverConfig, cgroupIDRegexp *regexp.Regexp)
for _, p := range processes {
spid := fmt.Sprintf("%d", p.Pid)
var (
exe, cwd, commandline, containerID, cgroupID, username, uid string
exe, cwd, commandline, containerID, cgroupPath, username, uid string
)
if cfg.Exe {
exe, err = p.Exe()
Expand Down Expand Up @@ -143,8 +142,8 @@ func discover(l log.Logger, cfg *DiscoverConfig, cgroupIDRegexp *regexp.Regexp)
continue
}
}
if cfg.CgroupID && cgroupIDRegexp != nil {
cgroupID, err = getLinuxProcessCgroupID(spid, cgroupIDRegexp)
if cfg.CgroupPath {
cgroupPath, err = getLinuxProcessCgroupPath(spid)
if err != nil {
loge(int(p.Pid), err)
continue
Expand All @@ -156,7 +155,7 @@ func discover(l log.Logger, cfg *DiscoverConfig, cgroupIDRegexp *regexp.Regexp)
cwd: cwd,
commandline: commandline,
containerID: containerID,
cgroupID: cgroupID,
cgroupPath: cgroupPath,
username: username,
uid: uid,
})
Expand All @@ -179,14 +178,14 @@ func getLinuxProcessContainerID(pid string) (string, error) {
return "", nil
}

func getLinuxProcessCgroupID(pid string, regexp *regexp.Regexp) (string, error) {
func getLinuxProcessCgroupPath(pid string) (string, error) {
cgroup, err := os.Open(path.Join("/proc", pid, "cgroup"))
if err != nil {
return "", err
}
defer cgroup.Close()
if cgroupID := getIDFromCGroup(cgroup, regexp); cgroupID != "" {
return cgroupID, nil
if cgroupPath := getPathFromCGroup(cgroup); cgroupPath != "" {
return cgroupPath, nil
}

return "", nil
Expand Down
Loading

0 comments on commit 1302357

Please sign in to comment.