Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use config for metrics #61

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions client/client_4.x.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@ package client
import (
"emqx-exporter/collector"
"fmt"
jsoniter "github.com/json-iterator/go"
"github.com/valyala/fasthttp"
"net/http"
"strconv"
"strings"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/valyala/fasthttp"
)

var _ client = &cluster4x{}

type cluster4x struct {
version string
client *fasthttp.Client
username string
password string
version string
client *fasthttp.Client
}

func (n *cluster4x) getVersion() string {
Expand All @@ -30,8 +33,7 @@ func (n *cluster4x) getLicense() (lic *collector.LicenseInfo, err error) {
}
Code int
}{}

data, statusCode, err := callHTTPGet(n.client, "/api/v4/license")
data, statusCode, err := callHTTPGet(n.client, "/api/v4/license", n.username, n.password)
if statusCode == http.StatusNotFound {
// open source version doesn't support license api
err = nil
Expand Down Expand Up @@ -80,7 +82,7 @@ func (n *cluster4x) getClusterStatus() (cluster collector.ClusterStatus, err err
}
Code int
}{}
err = callHTTPGetWithResp(n.client, "/api/v4/nodes", &resp)
err = callHTTPGetWithResp(n.client, "/api/v4/nodes", n.username, n.password, &resp)
if err != nil {
return
}
Expand Down Expand Up @@ -121,7 +123,7 @@ func (n *cluster4x) getBrokerMetrics() (metrics *collector.Broker, err error) {
}
Code int
}{}
data, statusCode, err := callHTTPGet(n.client, "/api/v4/monitor/current_metrics")
data, statusCode, err := callHTTPGet(n.client, "/api/v4/monitor/current_metrics", n.username, n.password)
if statusCode == http.StatusNotFound {
// open source version doesn't support this api
err = nil
Expand Down Expand Up @@ -153,30 +155,30 @@ func (n *cluster4x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err
resp := struct {
Data []struct {
Metrics []struct {
Node string `json:"node"`
Node string `json:"node"`
SpeedMax float64 `json:"speed_max"`
SpeedLast5m float64 `json:"speed_last5m"`
Speed float64 `json:"speed"`
Matched int64 `json:"matched"`
Passed int64 `json:"passed"`
NoResult int64 `json:"no_result"`
Exception int64 `json:"exception"`
Failed int64 `json:"failed"`
Matched int64 `json:"matched"`
Passed int64 `json:"passed"`
NoResult int64 `json:"no_result"`
Exception int64 `json:"exception"`
Failed int64 `json:"failed"`
}
Actions []struct {
Metrics []struct {
Node string `json:"node"`
Taken int64 `json:"taken"`
Success int64 `json:"success"`
Failed int64 `json:"failed"`
Taken int64 `json:"taken"`
Success int64 `json:"success"`
Failed int64 `json:"failed"`
}
}
ID string `json:"id"`
Enabled bool
}
Code int
}{}
err = callHTTPGetWithResp(n.client, "/api/v4/rules?_limit=10000", &resp)
err = callHTTPGetWithResp(n.client, "/api/v4/rules?_limit=10000", n.username, n.password, &resp)
if err != nil {
return
}
Expand Down Expand Up @@ -226,21 +228,21 @@ func (n *cluster4x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err
}

func (n *cluster4x) getDataBridge() (bridges []collector.DataBridge, err error) {
bridgesResp := struct {
resp := struct {
Data []struct {
ID string `json:"id"`
Type string
Status bool
}
Code int
}{}
err = callHTTPGetWithResp(n.client, "/api/v4/resources", &bridgesResp)
err = callHTTPGetWithResp(n.client, "/api/v4/resources", n.username, n.password, &resp)
if err != nil {
return
}

bridges = make([]collector.DataBridge, len(bridgesResp.Data))
for i, data := range bridgesResp.Data {
bridges = make([]collector.DataBridge, len(resp.Data))
for i, data := range resp.Data {
enabled := unhealthy
if data.Status {
enabled = healthy
Expand Down
45 changes: 24 additions & 21 deletions client/client_5.x.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@ package client
import (
"emqx-exporter/collector"
"fmt"
"github.com/valyala/fasthttp"
"strconv"
"time"

"github.com/valyala/fasthttp"
)

var _ client = &cluster5x{}

type cluster5x struct {
version string
edition edition
client *fasthttp.Client
username string
password string
version string
edition edition
client *fasthttp.Client
}

func (n *cluster5x) getVersion() string {
Expand All @@ -29,7 +32,7 @@ func (n *cluster5x) getLicense() (lic *collector.LicenseInfo, err error) {
MaxConnections int64 `json:"max_connections"`
ExpiryAt string `json:"expiry_at"`
}{}
err = callHTTPGetWithResp(n.client, "/api/v5/license", &resp)
err = callHTTPGetWithResp(n.client, "/api/v5/license", n.username, n.password, &resp)
if err != nil {
return
}
Expand Down Expand Up @@ -60,7 +63,7 @@ func (n *cluster5x) getClusterStatus() (cluster collector.ClusterStatus, err err
Load5 any `json:"load5"`
Load15 any `json:"load15"`
}{{}}
err = callHTTPGetWithResp(n.client, "/api/v5/nodes", &resp)
err = callHTTPGetWithResp(n.client, "/api/v5/nodes", n.username, n.password, &resp)
if err != nil {
return
}
Expand Down Expand Up @@ -106,7 +109,7 @@ func (n *cluster5x) getBrokerMetrics() (metrics *collector.Broker, err error) {
SentMsgRate int64 `json:"sent_msg_rate"`
ReceivedMsgRate int64 `json:"received_msg_rate"`
}{}
err = callHTTPGetWithResp(n.client, "/api/v5/monitor_current", &resp)
err = callHTTPGetWithResp(n.client, "/api/v5/monitor_current", n.username, n.password, &resp)
if err != nil {
return
}
Expand All @@ -126,7 +129,7 @@ func (n *cluster5x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err
Enable bool
}
}{}
err = callHTTPGetWithResp(n.client, "/api/v5/rules?limit=10000", &resp)
err = callHTTPGetWithResp(n.client, "/api/v5/rules?limit=10000", n.username, n.password, &resp)
if err != nil {
return
}
Expand All @@ -140,12 +143,12 @@ func (n *cluster5x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err
NodeMetrics []struct {
Node string
Metrics struct {
Rate float64 `json:"matched.rate"`
RateLast5m float64 `json:"matched.rate.last5m"`
RateMax float64 `json:"matched.rate.max"`
Matched int64
Passed int64
Failed int64
Rate float64 `json:"matched.rate"`
RateLast5m float64 `json:"matched.rate.last5m"`
RateMax float64 `json:"matched.rate.max"`
Matched int64
Passed int64
Failed int64
Exception int64 `json:"failed.exception"`
NoResult int64 `json:"failed.no_result"`
ActionTotal int64 `json:"actions.total"`
Expand All @@ -154,7 +157,7 @@ func (n *cluster5x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err
}
} `json:"node_metrics"`
}{}
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/rules/%s/metrics", rule.ID), &metricsResp)
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/rules/%s/metrics", rule.ID), n.username, n.password, &metricsResp)
if err != nil {
return
}
Expand Down Expand Up @@ -187,7 +190,7 @@ func (n *cluster5x) getDataBridge() (bridges []collector.DataBridge, err error)
Type string
Status string
}{{}}
err = callHTTPGetWithResp(n.client, "/api/v5/bridges", &bridgesResp)
err = callHTTPGetWithResp(n.client, "/api/v5/bridges", n.username, n.password, &bridgesResp)
if err != nil {
return
}
Expand All @@ -211,7 +214,7 @@ func (n *cluster5x) getDataBridge() (bridges []collector.DataBridge, err error)
Dropped int64
}
}{}
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/bridges/%s:%s/metrics", data.Type, data.Name), &metricsResp)
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/bridges/%s:%s/metrics", data.Type, data.Name), n.username, n.password, &metricsResp)
if err != nil {
return
}
Expand All @@ -230,7 +233,7 @@ func (n *cluster5x) getAuthenticationMetrics() (dataSources []collector.DataSour
Backend string
Enable bool
}{{}}
err = callHTTPGetWithResp(n.client, "/api/v5/authentication", &resp)
err = callHTTPGetWithResp(n.client, "/api/v5/authentication", n.username, n.password, &resp)
if err != nil {
return
}
Expand All @@ -254,7 +257,7 @@ func (n *cluster5x) getAuthenticationMetrics() (dataSources []collector.DataSour
} `json:"node_metrics"`
Status string
}{}
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authentication/%s/status", plugin.ID), &status)
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authentication/%s/status", plugin.ID), n.username, n.password, &status)
if err != nil {
return
}
Expand Down Expand Up @@ -293,7 +296,7 @@ func (n *cluster5x) getAuthorizationMetrics() (dataSources []collector.DataSourc
Enable bool
}
}{}
err = callHTTPGetWithResp(n.client, "/api/v5/authorization/sources", &resp)
err = callHTTPGetWithResp(n.client, "/api/v5/authorization/sources", n.username, n.password, &resp)
if err != nil {
return
}
Expand All @@ -317,7 +320,7 @@ func (n *cluster5x) getAuthorizationMetrics() (dataSources []collector.DataSourc
} `json:"node_metrics"`
Status string
}{}
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authorization/sources/%s/status", plugin.Type), &status)
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authorization/sources/%s/status", plugin.Type), n.username, n.password, &status)
if err != nil {
return
}
Expand Down
103 changes: 38 additions & 65 deletions client/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,86 +3,60 @@ package client
import (
"context"
"emqx-exporter/collector"
"emqx-exporter/config"
"fmt"
"github.com/alecthomas/kingpin/v2"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"strconv"
"strings"
"sync"
"time"
)

var (
emqxNodes = kingpin.Flag("emqx.nodes", "The list of EMQX cluster node addr").Default("").String()
emqxUsername = kingpin.Flag("emqx.auth-username", "The username used for emqx api basic auth").Default("").String()
emqxPassword = kingpin.Flag("emqx.auth-password", "The password used for emqx api basic auth").Default("").String()
"github.com/go-kit/log"
"github.com/go-kit/log/level"
)

type cluster struct {
client client
nodeLock sync.RWMutex
logger log.Logger
}

func NewCluster(logger log.Logger) collector.Cluster {
addrs := strings.Split(*emqxNodes, ",")
if len(addrs) == 0 {
panic(fmt.Sprintf("Invalid emqx node addrs: %s", *emqxNodes))
}
for _, addr := range addrs {
if !strings.ContainsRune(addr, ':') {
panic(fmt.Sprintf("Invalid emqx node addr: %s", addr))
func NewCluster(metrics *config.Metrics, logger log.Logger) collector.Cluster {
c := &cluster{}

go func() {
httpClient := getHTTPClient(metrics.Target)
for {
client4 := &cluster4x{
username: metrics.APIKey,
password: metrics.APISecret,
client: httpClient,
}
if _, err := client4.getClusterStatus(); err != nil {
c.client = client4
return
}

client5 := &cluster5x{
username: metrics.APIKey,
password: metrics.APISecret,
client: httpClient,
}
if _, err := client5.getClusterStatus(); err == nil {
c.client = client5
return
}

level.Error(logger).Log("msg", "Couldn't create cluster client, will retry it after 5 seconds", "err", "no cluster node found")
c.client = nil

select {
case <-context.Background().Done():
return
case <-time.After(5 * time.Second):
}
}
}

if *emqxUsername == "" {
panic("Missing username used for emqx api basic auth")
}
if *emqxPassword == "" {
panic("Missing password used for emqx api basic auth")
}

c := &cluster{
logger: logger,
}
go c.checkNodes()
}()
return c
}

func (c *cluster) checkNodes() {
httpClient := getHTTPClient(*emqxNodes)
var currentVersion string
for {
var client client
var err4, err5 error
client = &cluster4x{client: httpClient}
_, err4 = client.getClusterStatus()
if err4 != nil {
client = &cluster5x{client: httpClient}
_, err5 = client.getClusterStatus()
}
if err4 != nil && err5 != nil {
_ = level.Warn(c.logger).Log("check nodes", "couldn't get node info", "addr", *emqxNodes,
"err4", err4.Error(), "err5", err5.Error())
client = nil
} else if currentVersion != client.getVersion() {
currentVersion = client.getVersion()
_ = level.Info(c.logger).Log("ClusterVersion", currentVersion)
}

c.nodeLock.Lock()
c.client = client
c.nodeLock.Unlock()

select {
case <-context.Background().Done():
return
case <-time.After(5 * time.Second):
}
}
}

func (c *cluster) GetLicense() (lic *collector.LicenseInfo, err error) {
client := c.getNode()
if client == nil {
Expand All @@ -102,7 +76,6 @@ func (c *cluster) GetLicense() (lic *collector.LicenseInfo, err error) {
func (c *cluster) GetClusterStatus() (cluster collector.ClusterStatus, err error) {
client := c.getNode()
if client == nil {
cluster.Status = unknown
return
}
cluster, err = client.getClusterStatus()
Expand Down
Loading
Loading