Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #60 #59 Refactor Docker check to use go func for each container #86

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 82 additions & 51 deletions checks/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package checks
import (
"context"
"encoding/json"
"sync"

"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
log "github.com/sirupsen/logrus"

"github.com/it-novum/openitcockpit-agent-go/config"
)
Expand Down Expand Up @@ -54,63 +56,92 @@ func (c *CheckDocker) Run(ctx context.Context) (interface{}, error) {
return nil, err
}

var wg sync.WaitGroup
resultChan := make(chan *resultDocker, len(containers))
errorChan := make(chan error, len(containers))

dockerResults := make([]*resultDocker, 0)
for _, container := range containers {
response, err := cli.ContainerStats(ctx, container.ID, false)
if err != nil {
continue
}

defer response.Body.Close()
responseJson := json.NewDecoder(response.Body)

var stats *types.StatsJSON
if err := responseJson.Decode(&stats); err != nil {
continue
}
wg.Add(1)
go func(container types.Container) {
// This is a new "thread" / go func for each docker container
defer wg.Done()

response, err := cli.ContainerStats(ctx, container.ID, false)
if err != nil {
errorChan <- err
return
}

defer response.Body.Close()
responseJson := json.NewDecoder(response.Body)

var stats *types.StatsJSON
if err := responseJson.Decode(&stats); err != nil {
errorChan <- err
return
}

var bytesRead, bytesWrite uint64
var memory, memoryPercentage, cpuPercentage, networkRx, networkTx float64

// Memory and CPU usage needs to be calculated manually
// https://github.com/docker/cli/blob/a4a07c643042f4e2a75bf872f38b134502848214/cli/command/container/stats_helpers.go#L79-L128
if response.OSType != "windows" {
// Docker daemon is running in Linux or macOS
previousCPU := float64(stats.PreCPUStats.CPUUsage.TotalUsage)
previousSystem := float64(stats.PreCPUStats.SystemUsage)
cpuPercentage = c.calcCpuPercentageUnix(previousCPU, previousSystem, stats)
bytesRead, bytesWrite = c.calcDiskIoUnix(stats.BlkioStats)
networkRx, networkTx = c.calcNetworkIo(stats.Networks)
memory = c.calcMemoryUsageUnix(stats.MemoryStats)
memoryLimit := float64(stats.MemoryStats.Limit)
memoryPercentage = c.calcMemoryPercentageUnix(memoryLimit, memory)
} else {
// Docker daemon is running in Windows
cpuPercentage = c.calcCpuPercentageWindows(stats)
bytesRead = stats.StorageStats.ReadSizeBytes
bytesWrite = stats.StorageStats.WriteSizeBytes
networkRx, networkTx = c.calcNetworkIo(stats.Networks)
memory = float64(stats.MemoryStats.PrivateWorkingSet)
}

containerResult := &resultDocker{
Id: container.ID[:10],
Name: container.Names[0],
Image: container.Image,
SizeRw: container.SizeRw,
SizeRootFs: container.SizeRootFs,
State: container.State, // running
Status: container.Status, // Up 6 minutes
CpuPercentage: cpuPercentage,
MemoryPercentage: memoryPercentage,
NetworkRx: networkRx,
NetworkTx: networkTx,
DiskRead: bytesRead,
DiskWrite: bytesWrite,
MemoryUsed: memory,
}

// Return error back to check thread
resultChan <- containerResult
}(container)
}

var bytesRead, bytesWrite uint64
var memory, memoryPercentage, cpuPercentage, networkRx, networkTx float64

// Memory and CPU usage needs to be calculated manually
// https://github.com/docker/cli/blob/a4a07c643042f4e2a75bf872f38b134502848214/cli/command/container/stats_helpers.go#L79-L128
if response.OSType != "windows" {
// Docker daemon is running in Linux or macOS
previousCPU := float64(stats.PreCPUStats.CPUUsage.TotalUsage)
previousSystem := float64(stats.PreCPUStats.SystemUsage)
cpuPercentage = c.calcCpuPercentageUnix(previousCPU, previousSystem, stats)
bytesRead, bytesWrite = c.calcDiskIoUnix(stats.BlkioStats)
networkRx, networkTx = c.calcNetworkIo(stats.Networks)
memory = c.calcMemoryUsageUnix(stats.MemoryStats)
memoryLimit := float64(stats.MemoryStats.Limit)
memoryPercentage = c.calcMemoryPercentageUnix(memoryLimit, memory)
} else {
// Docker daemon is running in Windows
cpuPercentage = c.calcCpuPercentageWindows(stats)
bytesRead = stats.StorageStats.ReadSizeBytes
bytesWrite = stats.StorageStats.WriteSizeBytes
networkRx, networkTx = c.calcNetworkIo(stats.Networks)
memory = float64(stats.MemoryStats.PrivateWorkingSet)
}
// Check thread Wait for all results to come back and close the channels
go func() {
wg.Wait()
close(errorChan)
close(resultChan)
}()

containerResult := &resultDocker{
Id: container.ID[:10],
Name: container.Names[0],
Image: container.Image,
SizeRw: container.SizeRw,
SizeRootFs: container.SizeRootFs,
State: container.State, // running
Status: container.Status, // Up 6 minutes
CpuPercentage: cpuPercentage,
MemoryPercentage: memoryPercentage,
NetworkRx: networkRx,
NetworkTx: networkTx,
DiskRead: bytesRead,
DiskWrite: bytesWrite,
MemoryUsed: memory,
}
for err := range errorChan {
log.Errorln("Docker Check error: ", err)
}

dockerResults = append(dockerResults, containerResult)
for result := range resultChan {
dockerResults = append(dockerResults, result)
}

return dockerResults, nil
Expand Down