Skip to content

Commit

Permalink
feat: add config file
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Oct 11, 2023
1 parent 65321bf commit 7da86fd
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 11 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ FROM quay.io/prometheus/busybox:latest
LABEL maintainer="EMQX"

COPY --from=builder /workspace/emqx-exporter /bin/emqx-exporter
COPY config/example/config.yaml /etc/emqx-exporter/config.yaml

EXPOSE 8085
USER nobody
Expand Down
91 changes: 91 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package config

import (
"fmt"
"os"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

yaml "gopkg.in/yaml.v3"
)

type Config struct {
Probes []Probe `yaml:"probes"`
}

type Probe struct {
Target string `yaml:"target"`
Scheme string `yaml:"scheme,omitempty"`
ClientID string `yaml:"client_id,omitempty"`
Username string `yaml:"username,omitempty"`
Password string `yaml:"password,omitempty"`
Topic string `yaml:"topic,omitempty"`
QoS byte `yaml:"qos,omitempty"`
}

type SafeConfig struct {
sync.RWMutex
C *Config
configReloadSuccess prometheus.Gauge
configReloadSeconds prometheus.Gauge
}

func NewSafeConfig(reg prometheus.Registerer) *SafeConfig {
configReloadSuccess := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "emqx_exporter",
Name: "config_last_reload_successful",
Help: "EMQX exporter config loaded successfully.",
})

configReloadSeconds := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "emqx_exporter",
Name: "config_last_reload_success_timestamp_seconds",
Help: "Timestamp of the last successful configuration reload.",
})
return &SafeConfig{C: &Config{}, configReloadSuccess: configReloadSuccess, configReloadSeconds: configReloadSeconds}
}

func (sc *SafeConfig) ReloadConfig(confFile string) (err error) {
var c = &Config{}
defer func() {
if err != nil {
sc.configReloadSuccess.Set(0)
} else {
sc.configReloadSuccess.Set(1)
sc.configReloadSeconds.SetToCurrentTime()
}
}()

yamlReader, err := os.Open(confFile)
if err != nil {
return fmt.Errorf("error reading config file: %s", err)
}
defer yamlReader.Close()
decoder := yaml.NewDecoder(yamlReader)
decoder.KnownFields(true)

if err = decoder.Decode(c); err != nil {
return fmt.Errorf("error parsing config file: %s", err)
}

for index, probe := range c.Probes {
if probe.Scheme == "" {
probe.Scheme = "tcp"
}
if probe.ClientID == "" {
probe.ClientID = "emqx_exporter_probe"
}
if probe.Topic == "" {
probe.Topic = "emqx_exporter_probe"
}
c.Probes[index] = probe
}

sc.Lock()
sc.C = c
sc.Unlock()

return nil
}
8 changes: 8 additions & 0 deletions config/example/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
probes:
- target: broker.emqx.io:1883
scheme:
client_id:
username:
password:
topic:
qos:
5 changes: 4 additions & 1 deletion examples/docker-compose/prometheus-emqx5.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ scrape_configs:
# fix value, don't modify
from: exporter
- job_name: 'mqtt-probe'
metrics_path: /probe
metrics_path: '/probe'
params:
target:
- "broker.emqx.io:1883"
scrape_interval: 5s
static_configs:
- targets: [exporter-demo:8085]
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,4 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
44 changes: 42 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package main

import (
"emqx-exporter/client"
"emqx-exporter/config"
"emqx-exporter/prober"

"fmt"
stdlog "log"
"net/http"
Expand All @@ -29,6 +31,7 @@ import (
promcollectors "github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/common/promlog"
"github.com/prometheus/common/promlog/flag"
"gopkg.in/yaml.v2"

"emqx-exporter/collector"

Expand Down Expand Up @@ -112,6 +115,16 @@ func newHandler(includeExporterMetrics bool, maxRequests int, logger log.Logger)
}
}

var (
sc = config.NewSafeConfig(prometheus.DefaultRegisterer)

configFile = kingpin.Flag("config.file", "EMQX exporter configuration file.").Default("/etc/emqx-exporter/config.yaml").String()
)

func init() {
prometheus.MustRegister(version.NewCollector("emqx_exporter"))
}

func main() {
var (
disableExporterMetrics = kingpin.Flag(
Expand All @@ -134,10 +147,17 @@ func main() {
kingpin.CommandLine.UsageWriter(os.Stdout)
kingpin.HelpFlag.Short('h')
kingpin.Parse()
logger := promlog.New(promlogConfig)

logger := promlog.New(promlogConfig)
level.Info(logger).Log("msg", "Starting emqx-exporter", "version", version.Info())
level.Info(logger).Log("msg", "Build context", "build_context", version.BuildContext())

if err := sc.ReloadConfig(*configFile); err != nil {
level.Error(logger).Log("msg", "Error loading config", "err", err)
os.Exit(1)
}
level.Info(logger).Log("msg", "Loaded config file")

if user, err := user.Current(); err == nil && user.Uid == "0" {
level.Warn(logger).Log("msg", "EMQX Exporter is running as root user. This exporter is designed to run as unprivileged user, root is not required.")
}
Expand All @@ -147,7 +167,23 @@ func main() {
http.Handle("/metrics", newHandler(!*disableExporterMetrics, *maxRequests, logger))

http.HandleFunc("/probe", func(w http.ResponseWriter, r *http.Request) {
prober.Handler(w, r, logger)
sc.Lock()
probes := sc.C.Probes
sc.Unlock()
prober.Handler(w, r, probes, logger, nil)
})

http.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) {
sc.RLock()
c, err := yaml.Marshal(sc.C)
sc.RUnlock()
if err != nil {
level.Warn(logger).Log("msg", "Error marshalling configuration", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/plain")
w.Write(c)
})

landingConfig := web.LandingConfig{
Expand All @@ -163,6 +199,10 @@ func main() {
Address: "/probe",
Text: "Probe",
},
{
Address: "/config",
Text: "Config",
},
},
}
landingPage, err := web.NewLandingPage(landingConfig)
Expand Down
26 changes: 24 additions & 2 deletions prober/handler.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
package prober

import (
"emqx-exporter/config"
"fmt"

"net/http"
"net/url"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func Handler(w http.ResponseWriter, r *http.Request, logger log.Logger) {
func Handler(w http.ResponseWriter, r *http.Request, probes []config.Probe, logger log.Logger, params url.Values) {
var probe config.Probe
if params == nil {
params = r.URL.Query()
}
target := params.Get("target")
for i := 0; i < len(probes); i++ {
if probes[i].Target == target {
probe = probes[i]
break
}
}
if probe.Target == "" {
http.Error(w, fmt.Sprintf("Unknown probe target %q", target), http.StatusBadRequest)
level.Debug(logger).Log("msg", "Unknown probe target", "target", target)
return
}

probeSuccessGauge := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "emqx",
Subsystem: "mqtt",
Expand All @@ -28,7 +50,7 @@ func Handler(w http.ResponseWriter, r *http.Request, logger log.Logger) {
registry.MustRegister(probeDurationGauge)

start := time.Now()
if ProbeMQTT(logger) {
if ProbeMQTT(probe, logger) {
probeSuccessGauge.Set(1)
} else {
probeSuccessGauge.Set(0)
Expand Down
13 changes: 7 additions & 6 deletions prober/mqtt.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package prober

import (
"emqx-exporter/config"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
Expand All @@ -15,8 +16,8 @@ type MQTTProbe struct {

var mqttProbe *MQTTProbe

func initMQTTProbe(logger log.Logger) (*MQTTProbe, error) {
opt := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883").SetClientID("emqx-exporter")
func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) {
opt := mqtt.NewClientOptions().AddBroker(probe.Scheme + "://" + probe.Target).SetClientID(probe.ClientID).SetUsername(probe.Username).SetPassword(probe.Password)
opt.SetOnConnectHandler(func(c mqtt.Client) {
level.Info(logger).Log("msg", "Connected to MQTT broker")
})
Expand All @@ -30,7 +31,7 @@ func initMQTTProbe(logger log.Logger) (*MQTTProbe, error) {
}

var msgChan = make(chan mqtt.Message)
if token := c.Subscribe("emqx-exporter", 1, func(c mqtt.Client, m mqtt.Message) {
if token := c.Subscribe(probe.Topic, probe.QoS, func(c mqtt.Client, m mqtt.Message) {
msgChan <- m
}); token.Wait() && token.Error() != nil {
level.Error(logger).Log("msg", "Failed to subscribe to MQTT topic", "err", token.Error())
Expand All @@ -43,10 +44,10 @@ func initMQTTProbe(logger log.Logger) (*MQTTProbe, error) {
}, nil
}

func ProbeMQTT(logger log.Logger) bool {
func ProbeMQTT(probe config.Probe, logger log.Logger) bool {
if mqttProbe == nil {
var err error
if mqttProbe, err = initMQTTProbe(logger); err != nil {
if mqttProbe, err = initMQTTProbe(probe, logger); err != nil {
return false
}
}
Expand All @@ -55,7 +56,7 @@ func ProbeMQTT(logger log.Logger) bool {
return false
}

if token := mqttProbe.Client.Publish("emqx-exporter", 1, false, "hello world"); token.Wait() && token.Error() != nil {
if token := mqttProbe.Client.Publish(probe.Topic, probe.QoS, false, "hello world"); token.Wait() && token.Error() != nil {
return false
}

Expand Down

0 comments on commit 7da86fd

Please sign in to comment.