diff --git a/.gitignore b/.gitignore index 40aee91..17070d1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ mod_gearman_worker send_gearman +gearman_top +check_gearman *.exe *.linux.amd64 *.swp @@ -11,3 +13,11 @@ tools/ go.work go.work.sum test*.pl +/.idea/.gitignore +/cmd/.idea/cmd.iml +/.idea/mod-gearman-worker-go.iml +/.idea/modules.xml +/cmd/.idea/modules.xml +/.idea/vcs.xml +/cmd/.idea/vcs.xml +/cmd/.idea/workspace.xml diff --git a/.golangci.yml b/.golangci.yml index 4021636..bb1baa1 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -96,6 +96,7 @@ issues: - "Metric: modgearmanworker_workers_total Error: non-counter metrics" - 'local replacement are not allowed: pkg/' - "fieldalignment: struct" + - "Function 'PrintUsageCheckGearman' has too many statements" exclude-rules: # Exclude some linters from running on tests files. - path: _test.*\.go diff --git a/Changes b/Changes index a5e5749..b33f414 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,8 @@ This file documents the revision history for the Mod-Gearman-Worker-Go +next: + - add worker_name_in_result option + 1.5.1 Thu Mar 21 17:43:21 CET 2024 - update internal check_nsc_web to v0.7.2 - minimum go version is now 1.22 diff --git a/cmd/check_gearman/main.go b/cmd/check_gearman/main.go new file mode 100644 index 0000000..583cf9c --- /dev/null +++ b/cmd/check_gearman/main.go @@ -0,0 +1,13 @@ +package main + +import ( + "github.com/consol-monitoring/mod-gearman-worker-go/pkg/modgearman" +) + +// Build contains the current git commit id +// compile passing -ldflags "-X main.Build " to set the id. +var Build string + +func main() { + modgearman.CheckGearman(Build) +} diff --git a/cmd/gearman_top/main.go b/cmd/gearman_top/main.go new file mode 100644 index 0000000..e0e90de --- /dev/null +++ b/cmd/gearman_top/main.go @@ -0,0 +1,13 @@ +package main + +import ( + "github.com/consol-monitoring/mod-gearman-worker-go/pkg/modgearman" +) + +// Build contains the current git commit id +// compile passing -ldflags "-X main.Build " to set the id. +var Build string + +func main() { + modgearman.GearmanTop(Build) +} diff --git a/go.mod b/go.mod index 2e4956a..fada8ad 100644 --- a/go.mod +++ b/go.mod @@ -1,21 +1,21 @@ module github.com/consol-monitoring/mod-gearman-worker-go -go 1.22 +go 1.22.0 -toolchain go1.22.1 +toolchain go1.22.2 require ( github.com/appscode/g2 v0.0.0-20190123131438-388ba74fd273 github.com/consol-monitoring/check_nsc_web/pkg/checknscweb v0.0.0-20240321161425-fd9209e96e1f github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/kdar/factorlog v0.0.0-20211012144011-6ea75a169038 + github.com/nsf/termbox-go v1.1.1 github.com/prometheus/client_golang v1.20.2 github.com/sevlyar/go-daemon v0.1.6 github.com/sni/shelltoken v0.0.0-20240314123449-84b0a0c05450 + github.com/stretchr/testify v1.9.0 ) -require github.com/stretchr/testify v1.9.0 - require ( github.com/appscode/go v0.0.0-20201105063637-5613f3b8169f // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -27,12 +27,14 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-runewidth v0.0.16 // indirect github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.58.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/rivo/uniseg v0.4.7 // indirect golang.org/x/sys v0.24.0 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/robfig/cron.v2 v2.0.0-20150107220207-be2e0b0deed5 // indirect diff --git a/go.sum b/go.sum index 1d3bca8..b252560 100644 --- a/go.sum +++ b/go.sum @@ -52,6 +52,9 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= +github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI= github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -59,6 +62,8 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/nsf/termbox-go v1.1.1 h1:nksUPLCb73Q++DwbYUBEglYBRPZyoXJdrj5L+TkjyZY= +github.com/nsf/termbox-go v1.1.1/go.mod h1:T0cTdVuOwf7pHQNtfhnEbzHbcNyCEcVU4YPpouCbVxo= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= @@ -71,6 +76,9 @@ github.com/prometheus/common v0.58.0 h1:N+N8vY4/23r6iYfD3UQZUoJPnUYAo7v6LG5XZxjZ github.com/prometheus/common v0.58.0/go.mod h1:GpWM7dewqmVYcd7SmRaiWVe9SSqjf0UrwnYnpEZNuT0= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/sevlyar/go-daemon v0.1.6 h1:EUh1MDjEM4BI109Jign0EaknA2izkOyi0LV3ro3QQGs= diff --git a/pkg/modgearman/check_gearman.go b/pkg/modgearman/check_gearman.go new file mode 100644 index 0000000..d73188d --- /dev/null +++ b/pkg/modgearman/check_gearman.go @@ -0,0 +1,436 @@ +package modgearman + +import ( + "flag" + "fmt" + "net" + "os" + "strings" + "time" + + "github.com/appscode/g2/client" +) + +const ( + stateOk = 0 + stateWarning = 1 + stateCritical = 2 + stateUnknown = 3 + + pluginName = "check_gearman" +) + +type checkGmArgs struct { + Usage bool + Verbose bool + Version bool + Timeout int + JobWarning int + JobCritical int + WorkerWarning int + WorkerCritical int + Host string + TextToSend string + SendAsync bool + TextToExpect string + Queue string + UniqueID string + CritZeroWorker int +} + +type serverCheckData struct { + RC int + Message string + Checked int + TotalRunning int + TotalWaiting int + Version string +} + +type responseData struct { + statusCode int + response string +} + +type checkGearmanIDGen struct{} + +const ( + defaultTimeout = 10 + defaultJobWarning + defaultJobCritical = 100 + defaultWorkerWarning = 25 + defaultWorkerCritical = 50 + defaultCritZeroWorker = 1 +) + +func CheckGearman(build string) { + args := &checkGmArgs{} + // Define a new FlagSet for avoiding collisions with other flags + flagSet := flag.NewFlagSet("check_gearman", flag.ExitOnError) + flagSet.Usage = func() { printUsageCheckGearman(args) } + + flagSet.StringVar(&args.Host, "H", "", "hostname") + flagSet.IntVar(&args.Timeout, "t", defaultTimeout, "timeout in seconds") + flagSet.IntVar(&args.JobWarning, "w", defaultJobWarning, "job warning level") + flagSet.IntVar(&args.JobCritical, "c", defaultJobCritical, "job critical level") + flagSet.BoolVar(&args.Verbose, "v", false, "verbose output") + flagSet.BoolVar(&args.Version, "V", false, "print version") + flagSet.IntVar(&args.WorkerWarning, "W", defaultWorkerWarning, "worker warning level") + flagSet.IntVar(&args.WorkerCritical, "C", defaultWorkerCritical, "worker warning level") + flagSet.StringVar(&args.TextToSend, "s", "", "text to send") + flagSet.StringVar(&args.TextToExpect, "e", "", "text to expect") + flagSet.BoolVar(&args.SendAsync, "a", false, "send async - will ignore") + flagSet.StringVar(&args.Queue, "q", "", "queue") + flagSet.StringVar(&args.UniqueID, "u", "", "unique job id") + flagSet.IntVar(&args.CritZeroWorker, "x", defaultCritZeroWorker, "text to expect") + + // Parse the flags in the custom FlagSet + err := flagSet.Parse(os.Args[1:]) + if err != nil { + fmt.Fprintf(os.Stderr, "Error parsing flags -> %s", err.Error()) + os.Exit(1) + } + + if args.Version { + printVersionCheckGearman(build) + + os.Exit(stateUnknown) + } + + if args.Host == "" { + fmt.Fprintf(os.Stderr, "Error - no hostname given\n\n") + printUsageCheckGearman(args) + + return + } + + if args.TextToSend != "" && args.Queue == "" { + fmt.Fprintf(os.Stderr, "Error - need queue (-q) when sending job\n\n") + printUsageCheckGearman(args) + + return + } + + statusChan := make(chan int) + + go func() { + if args.TextToSend != "" { + // Using default global timeout instead on relying on library implementation of timeout + statusChan <- checkWorker(args) + } else { + statusChan <- checkServer(args) + } + }() + + var statusCode int + select { + case statusCode = <-statusChan: + case <-time.After(time.Duration(args.Timeout) * time.Second): + fmt.Fprintf(os.Stderr, "%s CRITICAL - timed out\n", pluginName) + statusCode = stateCritical + } + + os.Exit(statusCode) +} + +func checkWorker(args *checkGmArgs) int { + args.UniqueID = ternary(args.UniqueID == "", args.UniqueID, "check") + + res := responseData{ + statusCode: stateOk, + response: "", + } + + err := createWorkerJob(args, &res) + if err != nil { + fmt.Fprintf(os.Stderr, "%s CRITICAL - job failed: %s\n", pluginName, err) + + return res.statusCode + } + + if args.Verbose { + fmt.Fprintf(os.Stdout, "%s\n", res.response) + } + + if !args.SendAsync && args.TextToExpect != "" && res.response != "" { + if strings.Contains(res.response, args.TextToExpect) { + fmt.Fprintf(os.Stdout, "%s OK - send worker: '%s' response: '%s'\n", + pluginName, + args.TextToSend, + res.response, + ) + } else { + fmt.Fprintf(os.Stdout, "%s CRITICAL - send worker: '%s' response: '%s', expected '%s'\n", + pluginName, + args.TextToSend, + res.response, + args.TextToExpect, + ) + + res.statusCode = stateCritical + + return res.statusCode + } + + return res.statusCode + } + + // If result starts with a number followed by a colon, use this as exit code + if res.response != "" && len(res.response) > 1 && res.response[1] == ':' { + res.statusCode = int(res.response[0] - '0') + res.response = res.response[2:] + fmt.Fprintf(os.Stdout, "%s\n", res.response) + + return res.statusCode + } + + fmt.Fprintf(os.Stdout, "%s OK - %s\n", pluginName, res.response) + + return res.statusCode +} + +func createWorkerJob(args *checkGmArgs, res *responseData) (err error) { + // Unique id for all tasks is just "check" because it's the main task performed and helps with performance in neamon + client.IdGen = &checkGearmanIDGen{} + + if args.SendAsync { + res.response = "sending background job succeeded" + _, err = sendWorkerJobBg(args) + if err != nil { + res.statusCode = stateCritical + + return + } + } else { + res.response, err = sendWorkerJob(args) + if err != nil { + res.statusCode = stateCritical + + return + } + } + + return +} + +func (*checkGearmanIDGen) Id() string { //nolint // Function name needed for satisfying interface in g2 lib + return "check" +} + +func checkServer(args *checkGmArgs) (statusCode int) { + queueList, version, err := getServerQueues(args.Host) + if err != nil { + statusCode = stateCritical + fmt.Fprintf(os.Stderr, "%s\n", err) + + return + } + + serverData := serverCheckData{ + RC: stateOk, + Message: "", + Checked: 0, + TotalRunning: 0, + TotalWaiting: 0, + Version: version, + } + + statusCode = processServerData(queueList, &serverData, args) + printData(&serverData, queueList, args) + + return +} + +func getServerQueues(server string) ([]queue, string, error) { + hostName := extractHostName(server) + port, err := determinePort(server) + if err != nil { + return nil, "", err + } + serverAddress := fmt.Sprintf("%s:%d", hostName, port) + + connectionMap := map[string]net.Conn{} + queueList, version, err := processGearmanQueues(serverAddress, connectionMap) + if err != nil || len(queueList) == 0 { + return queueList, "", err + } + + return queueList, version, nil +} + +func processServerData(queueList []queue, data *serverCheckData, args *checkGmArgs) int { + data.RC = stateOk + + for _, element := range queueList { + if args.Queue != "" && args.Queue != element.Name { + continue + } + data.Checked++ + data.TotalRunning += element.Running + data.TotalWaiting += element.Waiting + + switch { + case element.Waiting > 0 && element.AvailWorker == 0: + data.RC = stateCritical + data.Message = fmt.Sprintf("Queue %s has %d job%s without any worker. ", + element.Name, + element.Waiting, + ternary(element.Waiting > 1, "s", ""), + ) + case args.JobCritical > 0 && element.Waiting >= args.JobCritical: + data.RC = stateCritical + data.Message = fmt.Sprintf("Queue %s has %d waiting job%s. ", + element.Name, + element.Waiting, + ternary(element.Waiting > 1, "s", ""), + ) + case args.WorkerCritical > 0 && element.AvailWorker >= args.WorkerCritical: + data.RC = stateCritical + data.Message = fmt.Sprintf("Queue %s has %d worker. ", + element.Name, + element.AvailWorker, + ) + case args.CritZeroWorker == 1 && element.AvailWorker == 0: + data.RC = stateCritical + data.Message = fmt.Sprintf("Queue %s has no worker. ", element.Name) + case args.JobWarning > 0 && element.Waiting >= args.JobWarning: + data.RC = stateWarning + data.Message = fmt.Sprintf("Queue %s has %d waiting job%s. ", + element.Name, + element.Waiting, + ternary(element.Waiting > 1, "s", ""), + ) + case args.WorkerWarning > 0 && element.AvailWorker >= args.WorkerWarning: + data.RC = stateWarning + data.Message = fmt.Sprintf("Queue %s has %d worker. ", element.Name, element.AvailWorker) + } + } + + if args.Queue == "" && data.Checked == 0 { + data.RC = stateWarning + data.Message = fmt.Sprintf("Queue %s not found", args.Queue) + } + + return data.RC +} + +func printData(data *serverCheckData, queueList []queue, args *checkGmArgs) { + fmt.Fprintf(os.Stdout, "%s ", pluginName) + switch data.RC { + case stateOk: + fmt.Fprintf(os.Stdout, "OK - %d job%s running and %d job%s waiting. Version: %s", + data.TotalRunning, + ternary(data.TotalRunning == 1, "", "s"), + data.TotalWaiting, + ternary(data.TotalWaiting == 1, "", "s"), + data.Version, + ) + case stateWarning: + fmt.Fprintf(os.Stdout, "WARNING - ") + case stateCritical: + fmt.Fprintf(os.Stdout, "CRITICAL - ") + case stateUnknown: + fmt.Fprintf(os.Stdout, "UNKNOWN - ") + } + fmt.Fprintf(os.Stdout, "%s", data.Message) + + // Print performance data + if len(queueList) > 0 { + fmt.Fprintf(os.Stdout, "|") + for _, element := range queueList { + if args.Queue != "" && args.Queue != element.Name { + continue + } + fmt.Fprintf(os.Stdout, "'%s_waiting'=%d;%d;%d;0 '%s_running'=%d '%s_worker'=%d;%d;%d;0 ", + element.Name, + element.Waiting, + args.JobWarning, + args.JobCritical, + element.Name, + element.Running, + element.Name, + element.AvailWorker, + args.WorkerWarning, + args.WorkerCritical, + ) + } + } + + fmt.Fprintf(os.Stdout, "\n") +} + +func printVersionCheckGearman(build string) { + config := &config{binary: "check_gearman", build: build} + printVersion(config) +} + +func printUsageCheckGearman(args *checkGmArgs) { + fmt.Fprintf(os.Stdout, "usage:\n") + fmt.Fprintf(os.Stdout, "\n") + fmt.Fprintf(os.Stdout, "check_gearman [ -H=[:port] ]\n") + fmt.Fprintf(os.Stdout, " [ -t= ]\n") + fmt.Fprintf(os.Stdout, " [ -w= ] default: %d\n", args.JobWarning) + fmt.Fprintf(os.Stdout, " [ -c= ] default: %d\n", args.JobCritical) + fmt.Fprintf(os.Stdout, " [ -W= ] default: %d\n", args.WorkerWarning) + fmt.Fprintf(os.Stdout, " [ -C= ] default: %d\n", args.WorkerCritical) + fmt.Fprintf(os.Stdout, " [ -q= ]\n") + fmt.Fprintf(os.Stdout, " [ -x= ] default: %d\n", args.CritZeroWorker) + fmt.Fprintf(os.Stdout, "\n") + fmt.Fprintf(os.Stdout, "\n") + fmt.Fprintf(os.Stdout, "to send a test job:\n") + fmt.Fprintf(os.Stdout, " [ -u= ] default: check\n") + fmt.Fprintf(os.Stdout, " [ -s= ]\n") + fmt.Fprintf(os.Stdout, " [ -e= ]\n") + fmt.Fprintf(os.Stdout, " [ -a send async ] will ignore -e\n") + fmt.Fprintf(os.Stdout, "\n") + fmt.Fprintf(os.Stdout, " [ -h print help ]\n") + fmt.Fprintf(os.Stdout, " [ -v verbose output ]\n") + fmt.Fprintf(os.Stdout, " [ -V print version ]\n") + fmt.Fprintf(os.Stdout, "\n") + fmt.Fprintf(os.Stdout, " - You may set thresholds to 0 to disable them.\n") + fmt.Fprintf(os.Stdout, " - You may use -x to enable critical exit if there is no worker for specified queue.\n") + fmt.Fprintf(os.Stdout, " - Thresholds are only for server checks, worker checks are availability only\n") + fmt.Fprintf(os.Stdout, "\n") + fmt.Fprintf(os.Stdout, "perfdata format when checking job server:\n") + fmt.Fprintf(os.Stdout, " 'queue waiting'=current waiting jobs;warn;crit;0 'queue running'=current running jobs "+ + "'queue worker'=current num worker;warn;crit;0\n") + fmt.Fprintf(os.Stdout, "\n") + fmt.Fprintf(os.Stdout, "Note: set your pnp RRD_STORAGE_TYPE to MULTIPLE to support changeing numbers of queues.\n") + fmt.Fprintf(os.Stdout, " see http://docs.pnp4nagios.org/de/pnp-0.6/tpl_custom for detailed information\n") + fmt.Fprintf(os.Stdout, "\n") + fmt.Fprintf(os.Stdout, "perfdata format when checking mod gearman worker:\n") + fmt.Fprintf(os.Stdout, " worker=10 jobs=1508c\n") + fmt.Fprintf(os.Stdout, "\n") + fmt.Fprintf(os.Stdout, "Note: Job thresholds are per queue not totals.\n") + fmt.Fprintf(os.Stdout, "\n") + fmt.Fprintf(os.Stdout, "Examples:\n") + fmt.Fprintf(os.Stdout, "\n") + fmt.Fprintf(os.Stdout, "Check job server:\n") + fmt.Fprintf(os.Stdout, "\n") + fmt.Fprintf(os.Stdout, "%%>./check_gearman -H localhost -q host\n") + fmt.Fprintf(os.Stdout, "check_gearman OK - 0 jobs running and 0 jobs waiting. Version: 0.14\n") + fmt.Fprintf(os.Stdout, "\n") + fmt.Fprintf(os.Stdout, "Check worker:\n") + fmt.Fprintf(os.Stdout, "\n") + fmt.Fprintf(os.Stdout, "%%> ./check_gearman -H -q worker_ -t 10 -s check\n") + fmt.Fprintf(os.Stdout, "check_gearman OK - host has 5 worker and is working on 0 jobs\n") + fmt.Fprintf(os.Stdout, "%%> ./check_gearman -H -q perfdata -t 10 -x\n") + fmt.Fprintf(os.Stdout, "check_gearman CRITICAL - Queue perfdata has 155 jobs without any worker. "+ + "|'perfdata_waiting'=155;10;100;0 'perfdata_running'=0 'perfdata_worker'=0;25;50;0\n") + fmt.Fprintf(os.Stdout, "\n") + fmt.Fprintf(os.Stdout, "Check result worker:\n") + fmt.Fprintf(os.Stdout, "%%> ./check_gearman -H -q check_results -t 10 -s check\n") + fmt.Fprintf(os.Stdout, "OK - result worker running on host. Sending 14.9 jobs/s (avg duration:0.040ms). "+ + "Version: 4.0.3|worker=3;;;0;3 avg_submit_duration=0.000040s;;;0;0.000429 jobs=2388c errors=0c\n") + fmt.Fprintf(os.Stdout, "\n") +} + +/* Helper function */ +//nolint:ireturn,nolintlint // Syntactic sugar +func ternary[T any](condition bool, trueVal, falseVal T) T { + if condition { + return trueVal + } + + return falseVal +} diff --git a/pkg/modgearman/client.go b/pkg/modgearman/client.go index af791f7..1aa5727 100644 --- a/pkg/modgearman/client.go +++ b/pkg/modgearman/client.go @@ -2,6 +2,7 @@ package modgearman import ( "fmt" + "os" "github.com/appscode/g2/client" "github.com/appscode/g2/pkg/runtime" @@ -13,11 +14,11 @@ import ( */ func sendAnswer(currentClient *client.Client, answer *answer, server string, encrypted bool) (*client.Client, error) { if currentClient == nil { - cl, err := client.New("tcp", server) + cl1, err := client.New("tcp", server) if err != nil { return nil, fmt.Errorf("client: %w", err) } - currentClient = cl + currentClient = cl1 } byteAnswer := createAnswer(answer, encrypted) @@ -30,3 +31,52 @@ func sendAnswer(currentClient *client.Client, answer *answer, server string, enc return currentClient, nil } + +func sendWorkerJobBg(args *checkGmArgs) (string, error) { + cl1, err := client.New("tcp", args.Host) + if err != nil { + err = fmt.Errorf("%s UNKNOWN - cannot create gearman client", pluginName) + + return "", err + } + defer cl1.Close() + + ret, taskErr := cl1.DoBg(args.Queue, []byte(args.TextToSend), runtime.JobHigh) + if taskErr != nil { + taskErr = fmt.Errorf("%w", taskErr) + + return "", taskErr + } + + return ret, nil +} + +func sendWorkerJob(args *checkGmArgs) (string, error) { + cl1, err := client.New("tcp", args.Host) + if err != nil { + return "", fmt.Errorf("%s UNKNOWN - cannot create gearman client", pluginName) + } + defer cl1.Close() + + ansChan := make(chan string) + + jobHandler := func(resp *client.Response) { + data, err := resp.Result() + ansChan <- string(data) + if err != nil { + fmt.Fprintf(os.Stderr, "Error, %s\n", err) + + return + } + } + + _, taskErr := cl1.Do(args.Queue, []byte(args.TextToSend), runtime.JobHigh, jobHandler) + if taskErr != nil { + taskErr = fmt.Errorf("%w", taskErr) + + return "", taskErr + } + response := <-ansChan + + return response, nil +} diff --git a/pkg/modgearman/gearman_top.go b/pkg/modgearman/gearman_top.go new file mode 100644 index 0000000..adfcf84 --- /dev/null +++ b/pkg/modgearman/gearman_top.go @@ -0,0 +1,353 @@ +package modgearman + +import ( + "flag" + "fmt" + "net" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/consol-monitoring/mod-gearman-worker-go/pkg/utils" + "github.com/nsf/termbox-go" +) + +type gmTopArgs struct { + Usage bool + Verbose bool + Version bool + Host string + Quiet bool + Interval float64 + Batch bool + Hosts []string +} + +type dataRow struct { + queueName string + workerAvailable string + jobsWaiting string + jobsRunning string +} + +const ( + connTimeout = 10 +) + +func GearmanTop(build string) { + args := &gmTopArgs{} + // Define a new FlagSet for avoiding collisions with other flags + flagSet := flag.NewFlagSet("gearman_top", flag.ExitOnError) + + flagSet.BoolVar(&args.Usage, "h", false, "Print usage") + flagSet.BoolVar(&args.Version, "V", false, "Print version") + flagSet.BoolVar(&args.Quiet, "q", false, "Quiet mode") + flagSet.BoolVar(&args.Batch, "b", false, "Batch mode") + flagSet.BoolVar(&args.Verbose, "v", false, "Verbose output") + flagSet.Float64Var(&args.Interval, "i", 1.0, "Set interval") + flagSet.Func("H", "Add host", func(host string) error { + return add2HostList(host, &args.Hosts) + }) + + // Parse the flags in the custom FlagSet + err := flagSet.Parse(os.Args[1:]) + if err != nil { + fmt.Fprintf(os.Stderr, "Error parsing flags -> %s", err.Error()) + os.Exit(1) + } + + implementLogger() + + if args.Usage { + printTopUsage() + + return + } + if args.Version { + printTopVersion(build) + + return + } + + hostList := createHostList(args.Hosts) + + // Map with active connections to the hosts in order to maintain a connection + // instead of creating a new connection on every iteration + connectionMap := make(map[string]net.Conn) + + if args.Batch { + printInBatchMode(hostList, connectionMap) + + return + } + + initializeTermbox() + defer termbox.Close() + + // Print stats for host in a loop + runInteractiveMode(args, hostList, connectionMap) +} + +func createHostList(hostList []string) []string { + if len(hostList) == 0 { + hostList = append(hostList, "localhost") + } else { + hostList = unique(hostList) + } + + return hostList +} + +func runInteractiveMode(args *gmTopArgs, hostList []string, connectionMap map[string]net.Conn) { + eventQueue := make(chan termbox.Event) + go func() { + for { + eventQueue <- termbox.PollEvent() + } + }() + + ticker := time.NewTicker(time.Duration(args.Interval * float64(time.Second))) + defer ticker.Stop() + + tableChan := make(chan map[string]string) + printMap := make(map[string]string) + var mutex sync.Mutex + + // Initialize printMap with placeholders + for _, host := range hostList { + printMap[host] = fmt.Sprintf("---- %s ----\nNot data yet...\n\n\n", host) + } + + // Get and print stats for all hosts in parallel in order to prevent a program block + // when a connection to a host runs into a timeout + printHostsInParallel(hostList, connectionMap, tableChan, args.Interval) + + // Print once before the ticker ticks for the first time + initPrint(&mutex, printMap, hostList, tableChan) + + for { + select { + case ev := <-eventQueue: + if ev.Type == termbox.EventKey && (ev.Key == termbox.KeyEsc || ev.Ch == 'q' || ev.Ch == 'Q' || ev.Key == termbox.KeyCtrlC) { + // Close all active connections + for key := range connectionMap { + if connectionMap[key] != nil { + err := connectionMap[key].Close() + if err != nil { + fmt.Fprintf(os.Stdout, "Error closing connection %v\n", err) + } + } + } + + return + } + case <-ticker.C: + printHosts(&mutex, hostList, printMap) + /* If a new stat is available all stats are transferred into the printMap + which maintains the order right order of the called hosts and assigns the + correct string (table) that should be printed */ + case tables := <-tableChan: + mutex.Lock() + for host, table := range tables { + printMap[host] = table + } + mutex.Unlock() + } + } +} + +func initializeTermbox() { + err := termbox.Init() + if err != nil { + panic(err) + } +} + +func implementLogger() { + cfg := &config{} + cfg.setDefaultValues() + createLogger(cfg) +} + +func printInBatchMode(hostList []string, connectionMap map[string]net.Conn) { + currTime := time.Now().Format("2006-01-02 15:04:05") + fmt.Fprintf(os.Stdout, "%s\n\n", currTime) + for _, host := range hostList { + fmt.Fprintln(os.Stdout, generateQueueTable(host, connectionMap)) + } +} + +func initPrint(mutex *sync.Mutex, printMap map[string]string, hostList []string, tableChan chan map[string]string) { + printHosts(mutex, hostList, printMap) + tables := <-tableChan + mutex.Lock() + for host, table := range tables { + printMap[host] = table + } + mutex.Unlock() + printHosts(mutex, hostList, printMap) +} + +func printHostsInParallel(hostList []string, connectionMap map[string]net.Conn, tableChan chan map[string]string, interval float64) { + for _, host := range hostList { + go func(host string) { + for { + table := generateQueueTable(host, connectionMap) + tableChan <- map[string]string{host: table} + time.Sleep(time.Duration(interval) * time.Second) + } + }(host) + } +} + +func printHosts(mutex *sync.Mutex, hostList []string, printMap map[string]string) { + mutex.Lock() + defer mutex.Unlock() + // Clear screen + fmt.Fprintf(os.Stdout, "\033[H\033[2J") + currTime := time.Now().Format("2006-01-02 15:04:05") + fmt.Fprintf(os.Stdout, "%s\n\n", currTime) + + for _, host := range hostList { + if table, ok := printMap[host]; ok { + fmt.Fprintln(os.Stdout, table) + } else { + fmt.Fprintf(os.Stdout, "---- %s ----", host) + fmt.Fprintf(os.Stdout, "No data yet...\n\n") + } + } +} + +func generateQueueTable(ogHostname string, connectionMap map[string]net.Conn) string { + hostName := extractHostName(ogHostname) + port, err := determinePort(ogHostname) + if err != nil { + fmt.Fprintf(os.Stderr, "%s %s\n", err, ogHostname) + os.Exit(1) + } + newAddress := fmt.Sprintf("%s:%d", hostName, port) + + queueList, version, err := processGearmanQueues(newAddress, connectionMap) + if err != nil { + return fmt.Sprintf("---- %s:%d ----\n%s\n\n", hostName, port, err) + } + if len(queueList) == 0 { + return fmt.Sprintf("---- %s:%d ----\nNo queues have been found at host %s\n\n", hostName, port, hostName) + } + + table, err := createTable(queueList) + if err != nil { + return fmt.Sprintf("---- %s:%d ----\nError: %s\n\n", hostName, port, err) + } + + return fmt.Sprintf("---- %s:%d ----- %s\n%s", hostName, port, version, table) +} + +func createTable(queueList []queue) (string, error) { + tableHeaders := createTableHeaders() + rows := createTableRows(queueList) + table, err := utils.ASCIITable(tableHeaders, rows, true) + if err != nil { + return "", fmt.Errorf("error creating table -> %w", err) + } + + tableSize := calcTableSize(tableHeaders) + tableHorizontalBorder := strings.Repeat("-", tableSize+1) // Add one for an additional pipe symbol at the end of each row + table = fmt.Sprintf("%s\n%s%s\n\n", tableHorizontalBorder, table, tableHorizontalBorder) + + return table, nil +} + +func calcTableSize(tableHeaders []utils.ASCIITableHeader) int { + tableSize := 0 + for _, header := range tableHeaders { + tableSize += header.Size + tableSize += 3 + } + + return tableSize +} + +func createTableHeaders() []utils.ASCIITableHeader { + tableHeaders := []utils.ASCIITableHeader{ + { + Name: "Queue Name", + Field: "queueName", + }, + { + Name: "Worker Available", + Field: "workerAvailable", + Alignment: "right", + }, + { + Name: "Jobs Waiting", + Field: "jobsWaiting", + Alignment: "right", + }, + { + Name: "Jobs running", + Field: "jobsRunning", + Alignment: "right", + }, + } + + return tableHeaders +} + +func createTableRows(queueList []queue) []dataRow { + rows := make([]dataRow, len(queueList)) + for i, queue := range queueList { + rows[i] = dataRow{ + queueName: queue.Name, + workerAvailable: strconv.Itoa(queue.AvailWorker), + jobsWaiting: strconv.Itoa(queue.Waiting), + jobsRunning: strconv.Itoa(queue.Running), + } + } + + return rows +} + +func printTopUsage() { + fmt.Fprintln(os.Stdout, "usage:") + fmt.Fprintln(os.Stdout) + fmt.Fprintln(os.Stdout, "gearman_top [ -H [:port] ]") + fmt.Fprintln(os.Stdout, " [ -i seconds ]") + fmt.Fprintln(os.Stdout, " [ -q quiet mode ]") + fmt.Fprintln(os.Stdout, " [ -b batch mode ]") + fmt.Fprintln(os.Stdout) + fmt.Fprintln(os.Stdout, " [ -h print help ]") + fmt.Fprintln(os.Stdout, " [ -v verbose output ]") + fmt.Fprintln(os.Stdout, " [ -V print version ]") + fmt.Fprintln(os.Stdout) + + os.Exit(0) +} + +func printTopVersion(build string) { + config := &config{binary: "check_gearman", build: build} + printVersion(config) + os.Exit(3) +} + +func add2HostList(host string, hostList *[]string) error { + *hostList = append(*hostList, host) + + return nil +} + +func unique[T comparable](input []T) []T { + seen := make(map[T]bool) + result := []T{} + + for _, v := range input { + if _, exists := seen[v]; !exists { + seen[v] = true + result = append(result, v) + } + } + + return result +} diff --git a/pkg/modgearman/send_gearman.go b/pkg/modgearman/send_gearman.go index a8c1857..a9310b1 100644 --- a/pkg/modgearman/send_gearman.go +++ b/pkg/modgearman/send_gearman.go @@ -210,6 +210,9 @@ func parseLine2Answer(config *config, result *answer, input string) error { if result.hostName == "" { return fmt.Errorf("invalid data, no hostname parsed") } + if result.hostName == "" { + return fmt.Errorf("invalid data, no hostname parsed") + } return nil } diff --git a/pkg/modgearman/statusAdmin.go b/pkg/modgearman/statusAdmin.go new file mode 100644 index 0000000..fbeb433 --- /dev/null +++ b/pkg/modgearman/statusAdmin.go @@ -0,0 +1,215 @@ +package modgearman + +import ( + "bytes" + "errors" + "fmt" + "io" + "net" + "os" + "sort" + "strconv" + "strings" + "time" +) + +type queue struct { + Name string // queue names + Total int // total number of jobs + Running int // number of running jobs + Waiting int // number of waiting jobs + AvailWorker int // total number of available worker +} + +const ( + gmDefaultPort = 4730 + + readBufferSize = 4000 + columnLength = 4 +) + +// Logic for sorting queues alphabetically +type byQueueName []queue + +func (a byQueueName) Len() int { return len(a) } +func (a byQueueName) Less(i, j int) bool { return a[i].Name < a[j].Name } +func (a byQueueName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func determinePort(address string) (int, error) { + addressParts := strings.Split(address, ":") + hostName := addressParts[0] + + switch len(addressParts) { + case 1: + return getDefaultPort(hostName) + case 2: + port, err := strconv.Atoi(addressParts[1]) + if err != nil { + return -1, fmt.Errorf("error converting port %s to int -> %w", address, err) + } + + return port, nil + default: + return -1, errors.New("too many colons in address") + } +} + +func getDefaultPort(hostname string) (int, error) { + if hostname == "localhost" || hostname == "127.0.0.1" { + envServer := os.Getenv("CONFIG_GEARMAND_PORT") + if envServer != "" { + port, err := strconv.Atoi(strings.Split(envServer, ":")[1]) + if err != nil { + return -1, fmt.Errorf("error converting port %s to int -> %w", envServer, err) + } + + return port, nil + } + } + + return gmDefaultPort, nil +} + +func extractHostName(address string) string { + return strings.Split(address, ":")[0] +} + +func processGearmanQueues(address string, connectionMap map[string]net.Conn) ([]queue, string, error) { + payload, err := queryGermanInstance(address, connectionMap) + if err != nil { + return nil, "", err + } + // Split retrieved payload and extract and store data + version := "" + lines := strings.Split(payload, "\n") + + if len(lines) == 0 { + return nil, "", nil + } + + queueList := []queue{} + for _, row := range lines { + columns := strings.Fields(row) + + if len(columns) == 2 && columns[0] == "OK" { + version = columns[1] + + continue + } + + if len(columns) < columnLength { + continue + } + + totalInt, err := strconv.Atoi(columns[1]) + if err != nil { + return nil, "", fmt.Errorf("the received data is not in the right format -> %w", err) + } + runningInt, err := strconv.Atoi(columns[2]) + if err != nil { + return nil, "", fmt.Errorf("the received data is not in the right format -> %w", err) + } + availWorkerInt, err := strconv.Atoi(columns[3]) + if err != nil { + return nil, "", fmt.Errorf("the received data is not in the right format -> %w", err) + } + + // Skip dummy queue if empty + if columns[0] == "dummy" && totalInt == 0 { + continue + } + + queueList = append(queueList, queue{ + Name: columns[0], + Total: totalInt, + Running: runningInt, + AvailWorker: availWorkerInt, + Waiting: totalInt - runningInt, + }) + } + + sort.Sort(byQueueName(queueList)) + + // Add v before version number for better formatting + if version != "" { + version = fmt.Sprintf("v%s", version) + } + + return queueList, version, nil +} + +func queryGermanInstance(address string, connectionMap map[string]net.Conn) (string, error) { + // Look for existing connection in connMap + // If no connection is found establish a new one with the host and save it to connMap for future use + conn, exists := connectionMap[address] + if !exists { + var err error + conn, err = makeConnection(address) + if err != nil { + return "", err + } + connectionMap[address] = conn + } + if err := writeConnection(conn, "status\nversion\n"); err != nil { + delete(connectionMap, address) + + return "", err + } + + payload, err := readConnection(conn) + if err != nil { + return "", err + } + + return payload, nil +} + +func makeConnection(address string) (net.Conn, error) { + conn, err := net.DialTimeout("tcp", address, connTimeout*time.Second) + if err != nil { + return nil, fmt.Errorf("timeout or error with tcp connection -> %w", err) + } + + return conn, nil +} + +func writeConnection(conn net.Conn, cmd string) error { + err := conn.SetWriteDeadline(time.Now().Add(connTimeout * time.Second)) + if err != nil { + return fmt.Errorf("error while setting write deadline -> %w", err) + } + _, err = conn.Write([]byte(cmd)) + if err != nil { + return fmt.Errorf("error while writing to tcp connection -> %w", err) + } + + return nil +} + +func readConnection(conn net.Conn) (string, error) { + err := conn.SetReadDeadline(time.Now().Add(connTimeout * time.Second)) + if err != nil { + return "", fmt.Errorf("error while setting read deadline -> %w", err) + } + var buffer bytes.Buffer + tmp := make([]byte, readBufferSize) + + for { + numReadBytes, err := conn.Read(tmp) + if numReadBytes > 0 { + buffer.Write(tmp[:numReadBytes]) + } + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + return "", fmt.Errorf("error while reading from tcp connection -> %w", err) + } + if numReadBytes > 0 && tmp[numReadBytes-1] == '\n' { + break + } + } + + return buffer.String(), nil +} diff --git a/pkg/utils/asciiTable.go b/pkg/utils/asciiTable.go new file mode 100644 index 0000000..29360db --- /dev/null +++ b/pkg/utils/asciiTable.go @@ -0,0 +1,142 @@ +package utils + +import ( + "fmt" + "reflect" + "strings" +) + +const maxLineLength = 120 + +type ASCIITableHeader struct { + Name string // name in table header + Field string // attribute name in data row + Alignment string // flag whether column is aligned to the right + Size int // calculated max Size of column +} + +// ASCIITable creates an ascii table from columns and data rows +func ASCIITable(header []ASCIITableHeader, rows interface{}, escapePipes bool) (string, error) { + dataRows := reflect.ValueOf(rows) + if dataRows.Kind() != reflect.Slice { + return "", fmt.Errorf("rows is not a slice") + } + + err := calculateHeaderSize(header, dataRows, escapePipes) + if err != nil { + return "", err + } + + // output header + out := "" + for _, head := range header { + out += fmt.Sprintf(fmt.Sprintf("| %%-%ds ", head.Size), head.Name) + } + out += "|\n" + + // output separator + for _, head := range header { + padding := " " + out += fmt.Sprintf("|%s%s%s", padding, strings.Repeat("-", head.Size), padding) + } + out += "|\n" + + // output data + for i := range dataRows.Len() { + rowVal := dataRows.Index(i) + for _, head := range header { + value, _ := asciiTableRowValue(escapePipes, rowVal, head) + + switch head.Alignment { + case "right": + out += fmt.Sprintf(fmt.Sprintf("| %%%ds ", head.Size), value) + case "left", "": + out += fmt.Sprintf(fmt.Sprintf("| %%-%ds ", head.Size), value) + case "centered": + padding := (head.Size - len(value)) / 2 + out += fmt.Sprintf("| %*s%-*s ", padding, "", head.Size-padding, value) + default: + err := fmt.Errorf("unsupported alignment '%s' in table", head.Alignment) + + return "", err + } + } + out += "|\n" + } + + return out, nil +} + +func asciiTableRowValue(escape bool, rowVal reflect.Value, head ASCIITableHeader) (string, error) { + value := "" + field := rowVal.FieldByName(head.Field) + if field.IsValid() { + t := field.Type().String() + switch t { + case "string": + value = field.String() + default: + return "", fmt.Errorf("unsupported struct attribute type for field %s: %s", head.Field, t) + } + } + + if escape { + value = strings.ReplaceAll(value, "\n", `\n`) + value = strings.ReplaceAll(value, "|", "\\|") + value = strings.ReplaceAll(value, "$", "\\$") + value = strings.ReplaceAll(value, "*", "\\*") + } + + return value, nil +} + +func calculateHeaderSize(header []ASCIITableHeader, dataRows reflect.Value, escapePipes bool) error { + // set headers as minimum Size + for i, head := range header { + header[i].Size = len(head.Name) + } + + // adjust column Size from max row data + for i := range dataRows.Len() { + rowVal := dataRows.Index(i) + if rowVal.Kind() != reflect.Struct { + return fmt.Errorf("row %d is not a struct", i) + } + for num, head := range header { + value, err := asciiTableRowValue(escapePipes, rowVal, head) + if err != nil { + return err + } + length := len(value) + if length > header[num].Size { + header[num].Size = length + } + } + } + + // calculate total line length + total := 0 + for i := range header { + total += header[i].Size + 3 // add padding + } + + if total < maxLineLength { + return nil + } + + avgAvail := maxLineLength / len(header) + tooWide := []int{} + sumTooWide := 0 + for i := range header { + if header[i].Size > avgAvail { + tooWide = append(tooWide, i) + sumTooWide += header[i].Size + } + } + avgLargeCol := (maxLineLength - (total - sumTooWide)) / len(tooWide) + for _, i := range tooWide { + header[i].Size = avgLargeCol + } + + return nil +}