diff --git a/Makefile b/Makefile index 2d64195..c1d3130 100644 --- a/Makefile +++ b/Makefile @@ -17,11 +17,6 @@ all:: ARCH = $(shell go env GOARCH) OS = $(shell go env GOOS) -# Needs to be defined before including Makefile.common to auto-generate targets -DOCKER_ARCHS ?= amd64 - -DOCKER_IMAGE_NAME ?= emqx-exporter:latest - .PHONY: build LOCALBIN build: CGO_ENABLED=0 GOOS=${OS} GOARCH=${ARCH} go build -o .build/${OS}-${ARCH}/emqx-exporter @@ -30,8 +25,9 @@ build: test: go test -race --cover -covermode=atomic -coverpkg=./... -coverprofile=cover.out ./... +DOCKER_IMAGE_NAME ?= emqx-exporter .PHONY: docker-build -docker-build: build +docker-build: docker build -t ${DOCKER_IMAGE_NAME} . ## Location to install dependencies to diff --git a/examples/docker-compose/docker-compose.yml b/examples/docker-compose/docker-compose.yml index 2a4a894..8eb6a86 100644 --- a/examples/docker-compose/docker-compose.yml +++ b/examples/docker-compose/docker-compose.yml @@ -1,7 +1,7 @@ version: '3.8' services: emqx: - image: emqx/emqx-enterprise:5.0.1 + image: emqx/emqx-enterprise:5.3 container_name: emqx-demo ports: - 18083:18083 @@ -11,11 +11,19 @@ services: - 8883:8883 environment: EMQX_DASHBOARD__BOOTSTRAP_USERS_FILE: '"/opt/emqx/data/api_secret"' + EMQX_API_KEY__BOOTSTRAP_FILE: '"/opt/emqx/data/api_secret"' volumes: - ./api_secret:/opt/emqx/data/api_secret + healthcheck: + test: ["CMD", "emqx", "ctl", "status"] + interval: 30s + timeout: 10s + retries: 3 emqx-exporter: - image: emqx/emqx-exporter + depends_on: + - emqx + image: emqx-exporter container_name: exporter-demo ports: - 8085:8085 diff --git a/examples/docker-compose/prometheus-emqx5.yaml b/examples/docker-compose/prometheus-emqx5.yaml index 1138546..d9b1581 100644 --- a/examples/docker-compose/prometheus-emqx5.yaml +++ b/examples/docker-compose/prometheus-emqx5.yaml @@ -28,4 +28,14 @@ scrape_configs: # label the cluster name of where the metrics data from cluster: test # fix value, don't modify - from: exporter \ No newline at end of file + from: exporter + - job_name: 'mqtt-probe' + metrics_path: /probe + scrape_interval: 5s + static_configs: + - targets: [exporter-demo:8085] + labels: + # label the cluster name of where the metrics data from + cluster: test + # fix value, don't modify + from: probe diff --git a/go.mod b/go.mod index 9a237ae..31d54a2 100644 --- a/go.mod +++ b/go.mod @@ -19,8 +19,10 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/eclipse/paho.mqtt.golang v1.4.3 // indirect github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/klauspost/compress v1.16.4 // indirect github.com/kr/text v0.2.0 // indirect diff --git a/go.sum b/go.sum index 2aeb7a3..0156b0a 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= +github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= @@ -28,6 +30,8 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= diff --git a/main.go b/main.go index fc8ca9d..67edd64 100644 --- a/main.go +++ b/main.go @@ -15,11 +15,8 @@ package main import ( "emqx-exporter/client" + "emqx-exporter/prober" "fmt" - "github.com/alecthomas/kingpin/v2" - promcollectors "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/prometheus/common/promlog" - "github.com/prometheus/common/promlog/flag" stdlog "log" "net/http" _ "net/http/pprof" @@ -28,7 +25,13 @@ import ( "runtime" "sort" + "github.com/alecthomas/kingpin/v2" + promcollectors "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/common/promlog" + "github.com/prometheus/common/promlog/flag" + "emqx-exporter/collector" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -143,6 +146,10 @@ func main() { http.Handle("/metrics", newHandler(!*disableExporterMetrics, *maxRequests, logger)) + http.HandleFunc("/probe", func(w http.ResponseWriter, r *http.Request) { + prober.Handler(w, r, logger) + }) + landingConfig := web.LandingConfig{ Name: "EMQX Exporter", Description: "EMQX Exporter", @@ -152,6 +159,10 @@ func main() { Address: "/metrics", Text: "Metrics", }, + { + Address: "/probe", + Text: "Probe", + }, }, } landingPage, err := web.NewLandingPage(landingConfig) diff --git a/prober/handler.go b/prober/handler.go new file mode 100644 index 0000000..a0d25c3 --- /dev/null +++ b/prober/handler.go @@ -0,0 +1,44 @@ +package prober + +import ( + "net/http" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +func Handler(w http.ResponseWriter, r *http.Request, logger log.Logger) { + // This is a stub function that should be implemented by the user. + probeSuccessGauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "emqx", + Subsystem: "mqtt", + Name: "probe_success", + Help: "Displays whether or not the probe was a success", + }) + probeDurationGauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "emqx", + Subsystem: "mqtt", + Name: "probe_duration_seconds", + Help: "Returns how long the probe took to complete in seconds", + }) + + start := time.Now() + registry := prometheus.NewRegistry() + registry.MustRegister(probeSuccessGauge) + registry.MustRegister(probeDurationGauge) + success := ProbeMQTT(logger) + duration := time.Since(start).Seconds() + probeDurationGauge.Set(duration) + if success { + probeSuccessGauge.Set(1) + level.Info(logger).Log("msg", "Probe succeeded", "duration_seconds", duration) + } else { + level.Error(logger).Log("msg", "Probe failed", "duration_seconds", duration) + } + + h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{}) + h.ServeHTTP(w, r) +} diff --git a/prober/mqtt.go b/prober/mqtt.go new file mode 100644 index 0000000..9daf577 --- /dev/null +++ b/prober/mqtt.go @@ -0,0 +1,69 @@ +package prober + +import ( + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/go-kit/log" + "github.com/go-kit/log/level" +) + +type MQTTProbe struct { + Client mqtt.Client + MsgChan <-chan mqtt.Message +} + +var mqttProbe *MQTTProbe + +func initMQTTProbe(logger log.Logger) *MQTTProbe { + opt := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883").SetClientID("emqx-exporter") + opt.SetOnConnectHandler(func(c mqtt.Client) { + level.Info(logger).Log("msg", "Connected to MQTT broker") + }) + opt.SetConnectionLostHandler(func(c mqtt.Client, err error) { + level.Error(logger).Log("msg", "Lost connection to MQTT broker", "err", err) + }) + c := mqtt.NewClient(opt) + if token := c.Connect(); token.Wait() && token.Error() != nil { + level.Error(logger).Log("msg", "Failed to connect to MQTT broker", "err", token.Error()) + panic(token.Error()) + } + + var msgChan = make(chan mqtt.Message) + if token := c.Subscribe("emqx-exporter", 1, func(c mqtt.Client, m mqtt.Message) { + msgChan <- m + }); token.Wait() && token.Error() != nil { + level.Error(logger).Log("msg", "Failed to subscribe to MQTT topic", "err", token.Error()) + panic(token.Error()) + } + + return &MQTTProbe{ + Client: c, + MsgChan: msgChan, + } +} + +func ProbeMQTT(logger log.Logger) bool { + if mqttProbe == nil { + mqttProbe = initMQTTProbe(logger) + } + + if !mqttProbe.Client.IsConnected() { + return false + } + + if token := mqttProbe.Client.Publish("emqx-exporter", 1, false, "hello world"); token.Wait() && token.Error() != nil { + return false + } + + select { + case msg := <-mqttProbe.MsgChan: + if msg == nil { + return false + } + case <-time.After(5 * time.Second): + return false + } + + return true +}