Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OA-55 Implement a Prometheus Exporter Proxy (similar to the exporter_exporter) #87

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 58 additions & 11 deletions agentrt/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,23 @@ type AgentInstance struct {
shutdown chan struct{}
reload chan chan struct{}

stateWebserver chan []byte
statePushClient chan []byte
checkResult chan map[string]interface{}
customCheckResultChan chan *checkrunner.CustomCheckResult
stateWebserver chan []byte
statePushClient chan []byte
prometheusStateWebserver chan map[string]string
checkResult chan map[string]interface{}
customCheckResultChan chan *checkrunner.CustomCheckResult
prometheusExporterResultChan chan *checkrunner.PrometheusExporterResult

customCheckResults map[string]interface{}

logHandler *loghandler.LogHandler
webserver *webserver.Server
checkRunner *checkrunner.CheckRunner
customCheckHandler *checkrunner.CustomCheckHandler
pushClient *pushclient.PushClient
prometheusExporterResults map[string]string

logHandler *loghandler.LogHandler
webserver *webserver.Server
checkRunner *checkrunner.CheckRunner
customCheckHandler *checkrunner.CustomCheckHandler
prometheusCheckHandler *checkrunner.PrometheusCheckHandler
pushClient *pushclient.PushClient
}

func (a *AgentInstance) processCheckResult(result map[string]interface{}) {
Expand All @@ -50,6 +55,22 @@ func (a *AgentInstance) processCheckResult(result map[string]interface{}) {
result["customchecks"] = a.customCheckResults
}

prometheus_results_data := make(map[string]string, len(a.prometheusExporterResults))
if a.prometheusExporterResults == nil {
result["prometheus_exporters"] = "[]"
} else {
// Merge the name of all available prometheus exporters into the check result
keys := make([]string, len(a.prometheusExporterResults))
i := 0
for k, result := range a.prometheusExporterResults {
keys[i] = k
prometheus_results_data[k] = result
i++
}

result["prometheus_exporters"] = keys
}

data, err := json.Marshal(result)
if err != nil {
log.Errorln("Internal error: could not serialize check result: ", err)
Expand All @@ -73,6 +94,7 @@ func (a *AgentInstance) processCheckResult(result map[string]interface{}) {
// we may have to give the webserver some time to think about it
select {
case a.stateWebserver <- data: // Pass checkresult json to webserver
case a.prometheusStateWebserver <- prometheus_results_data: // Pass Prometheus Exporter data to webserver
case <-t.C:
log.Errorln("Internal error: could not store check result for webserver: timeout")
}
Expand Down Expand Up @@ -104,6 +126,9 @@ func (a *AgentInstance) doReload(ctx context.Context, cfg *config.Configuration)
if a.checkResult == nil {
a.checkResult = make(chan map[string]interface{})
}
if a.prometheusStateWebserver == nil {
a.prometheusStateWebserver = make(chan map[string]string)
}

// we do not stop the webserver on every reload for better availability during the wizard setup

Expand All @@ -114,8 +139,9 @@ func (a *AgentInstance) doReload(ctx context.Context, cfg *config.Configuration)

if a.webserver == nil && (!cfg.OITC.Push || (cfg.OITC.Push && cfg.OITC.EnableWebserver)) {
a.webserver = &webserver.Server{
StateInput: a.stateWebserver,
Reloader: a, // Set agent instance to Reloader interface for the webserver handler
StateInput: a.stateWebserver,
PrometheusInput: a.prometheusStateWebserver,
Reloader: a, // Set agent instance to Reloader interface for the webserver handler
}
a.webserver.Start(ctx)
}
Expand Down Expand Up @@ -154,6 +180,7 @@ func (a *AgentInstance) doReload(ctx context.Context, cfg *config.Configuration)
}
}
a.doCustomCheckReload(ctx, cfg.CustomCheckConfiguration)
a.doPrometheusExporterCheckReload(ctx, cfg.PrometheusExporterConfiguration)
}

func (a *AgentInstance) doCustomCheckReload(ctx context.Context, ccc []*config.CustomCheck) {
Expand All @@ -170,6 +197,20 @@ func (a *AgentInstance) doCustomCheckReload(ctx context.Context, ccc []*config.C
}
}

func (a *AgentInstance) doPrometheusExporterCheckReload(ctx context.Context, exporters []*config.PrometheusExporter) {
if a.prometheusCheckHandler != nil {
a.prometheusCheckHandler.Shutdown()
a.prometheusCheckHandler = nil
}
if len(exporters) > 0 {
a.prometheusCheckHandler = &checkrunner.PrometheusCheckHandler{
Configuration: exporters,
ResultOutput: a.prometheusExporterResultChan,
}
a.prometheusCheckHandler.Start(ctx)
}
}

func (a *AgentInstance) stop() {
wg := sync.WaitGroup{}
if a.logHandler != nil {
Expand Down Expand Up @@ -221,6 +262,8 @@ func (a *AgentInstance) Start(parent context.Context) {
a.checkResult = make(chan map[string]interface{})
a.customCheckResultChan = make(chan *checkrunner.CustomCheckResult)
a.customCheckResults = map[string]interface{}{}
a.prometheusExporterResultChan = make(chan *checkrunner.PrometheusExporterResult)
a.prometheusExporterResults = make(map[string]string)
a.shutdown = make(chan struct{})
a.reload = make(chan chan struct{})
a.logHandler = &loghandler.LogHandler{
Expand Down Expand Up @@ -268,7 +311,11 @@ func (a *AgentInstance) Start(parent context.Context) {
case res := <-a.customCheckResultChan:
// received check result from customcheckhandler
a.customCheckResults[res.Name] = res.Result
case res := <-a.prometheusExporterResultChan:
// received check result from prometheus exporter
a.prometheusExporterResults[res.Name] = res.Result
}

}
}()

Expand Down
103 changes: 103 additions & 0 deletions checkrunner/prometheuscheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package checkrunner

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"sync"
"time"

"github.com/it-novum/openitcockpit-agent-go/config"
log "github.com/sirupsen/logrus"
)

type PrometheusCheckExecutor struct {
Configuration *config.PrometheusExporter
ResultOutput chan *PrometheusExporterResult

wg sync.WaitGroup
shutdown chan struct{}
}

func (c *PrometheusCheckExecutor) Shutdown() {
close(c.shutdown)
c.wg.Wait()
}

func (c *PrometheusCheckExecutor) runCheck(ctx context.Context, timeout time.Duration) {
log.Debugln("Begin Prometheus Exporter: ", c.Configuration.Name)

client := &http.Client{
Timeout: timeout,
}

url := fmt.Sprintf("http://%s:%d%s", "localhost", c.Configuration.Port, c.Configuration.Path)
resp, err := client.Get(url)
if err != nil {
log.Infoln("Prometheus Exporter '", c.Configuration.Name, "' error: ", err)
}

defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
log.Infoln("Prometheus Exporter Error reading response body '", c.Configuration.Name, "' error: ", err)
return
}

select {
// Return custom check result to Agent Instance
case c.ResultOutput <- &PrometheusExporterResult{
Name: c.Configuration.Name,
Result: string(body),
}:
case <-time.After(time.Second * 5):
log.Errorln("Internal error: timeout could not save Prometheus Exporter result")
case <-c.shutdown:
log.Errorln("Prometheus Exporte: canceled")
return
case <-ctx.Done():
log.Errorln("Prometheus Exporte: canceled")
return
}
log.Debugln("Finish Prometheus Exporte: ", c.Configuration.Name)
}

func (c *PrometheusCheckExecutor) Start(parent context.Context) error {
c.shutdown = make(chan struct{})
timeout := time.Duration(c.Configuration.Timeout) * time.Second
interval := time.Duration(c.Configuration.Interval) * time.Second

if timeout > interval {
return errors.New("custom check timeout must be lower or equal to interval")
}

c.wg.Add(1)
go func() {
defer c.wg.Done()

ctx, cancel := context.WithCancel(parent)
defer cancel()

ticker := time.NewTicker(interval)
defer ticker.Stop()

c.runCheck(ctx, timeout)
for {
select {
case <-ctx.Done():
return
case _, ok := <-c.shutdown:
if !ok {
return
}
case <-ticker.C:
c.runCheck(ctx, timeout)
}
}
}()

return nil
}
101 changes: 101 additions & 0 deletions checkrunner/prometheuscheckhandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package checkrunner

import (
"context"
"sync"

"github.com/it-novum/openitcockpit-agent-go/config"
log "github.com/sirupsen/logrus"
)

type PrometheusExporterResult struct {
Name string
Result string
}

// PrometheusCheckHandler runs proemtheus exporter
type PrometheusCheckHandler struct {
// ResultOutput channel for check results
// Do not close before Shutdown completes
ResultOutput chan *PrometheusExporterResult
Configuration []*config.PrometheusExporter

executors []*PrometheusCheckExecutor
shutdown chan struct{}
wg sync.WaitGroup
}

// stop all custom check executors in parallel
// the cancel of the context should cause all executors to stop almost immediatly
func (c *PrometheusCheckHandler) stopExecutors() {
if len(c.executors) < 1 {
return
}

stopC := make(chan *PrometheusCheckExecutor)

for i := 0; i < len(c.executors); i++ {
go func() {
for e := range stopC {
e.Shutdown()
log.Infoln("Prometheus Exporter ", e.Configuration.Name, " stopped")
c.wg.Done()
}
}()
}

for _, executor := range c.executors {
stopC <- executor
}

close(stopC)
}

// Run the custom checks in background (DO NOT RUN IN GO ROUTINE)
func (c *PrometheusCheckHandler) Start(parentCtx context.Context) {
c.shutdown = make(chan struct{})
c.executors = make([]*PrometheusCheckExecutor, len(c.Configuration))

for i, checkConfig := range c.Configuration {
c.executors[i] = &PrometheusCheckExecutor{
Configuration: checkConfig,
ResultOutput: c.ResultOutput,
}
}

c.wg.Add(1)
go func() {
defer c.wg.Done()

ctx, cancel := context.WithCancel(parentCtx)
defer cancel()

for _, executor := range c.executors {
log.Infoln("Custom Check ", executor.Configuration.Name, " starting")
c.wg.Add(1)
if err := executor.Start(ctx); err != nil {
log.Errorln(err)
}
}

defer c.stopExecutors()

for {
select {
case <-ctx.Done():
return
case _, ok := <-c.shutdown:
if !ok {
return
}
}
}

}()
}

// Shutdown custom check runner, waits for completion
func (c *PrometheusCheckHandler) Shutdown() {
close(c.shutdown)
c.wg.Wait()
}
Loading