Skip to content

Commit

Permalink
add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ArthurHlt committed Sep 9, 2023
1 parent 3601701 commit 8f1e10c
Show file tree
Hide file tree
Showing 18 changed files with 789 additions and 151 deletions.
44 changes: 43 additions & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@ package app

import (
"context"
"crypto/tls"
"fmt"
consul "github.com/hashicorp/consul/api"
"github.com/orange-cloudfoundry/gsloc/config"
"github.com/orange-cloudfoundry/gsloc/disco"
"github.com/orange-cloudfoundry/gsloc/geolocs"
"github.com/orange-cloudfoundry/gsloc/healthchecks"
"github.com/orange-cloudfoundry/gsloc/lb"
"github.com/orange-cloudfoundry/gsloc/proxmetrics"
"github.com/orange-cloudfoundry/gsloc/regs"
"github.com/orange-cloudfoundry/gsloc/resolvers"
"github.com/orange-cloudfoundry/gsloc/rets"
"github.com/orange-cloudfoundry/gsloc/servers"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"net"
"net/url"
"os"
"os/signal"
"sync"
Expand Down Expand Up @@ -44,6 +47,7 @@ type App struct {
geoLoc *geolocs.GeoLoc
hcHandler *healthchecks.HcHandler
grpcServer *grpc.Server
gslocConsul *disco.GslocConsul
onlyServeDns bool
noServeDns bool
}
Expand All @@ -62,6 +66,10 @@ func NewApp(cnf *config.Config, onlyServeDns, noServeDns bool) (*App, error) {
if err != nil {
return nil, fmt.Errorf("app loadConsulClient: %w", err)
}
err = app.loadGslocConsul()
if err != nil {
return nil, fmt.Errorf("app loadGslocConsul: %w", err)
}
err = app.loadConsulDiscoverer()
if err != nil {
return nil, fmt.Errorf("app loadConsulDiscoverer: %w", err)
Expand Down Expand Up @@ -140,6 +148,36 @@ func (a *App) loadRetriever() error {
return nil
}

func (a *App) makeMetricsProxy() *proxmetrics.Fetcher {
rawConsul := fmt.Sprintf("%s://%s/v1/agent/metrics?format=prometheus", a.cnf.ConsulConfig.Scheme, a.cnf.ConsulConfig.Addr)
consulUrl, _ := url.Parse(rawConsul) // nolint
a.cnf.MetricsConfig.ProxyMetricsConfig.Targets = append(
a.cnf.MetricsConfig.ProxyMetricsConfig.Targets,
&config.ProxyMetricsTarget{
Name: "consul",
URL: config.URLConfig{
URL: consulUrl,
Raw: rawConsul,
},
},
)
return proxmetrics.NewFetcher(
proxmetrics.NewScraper(&tls.Config{
InsecureSkipVerify: true,
}),
a.cnf.MetricsConfig.ProxyMetricsConfig.Targets,
)
}

func (a *App) loadGslocConsul() error {
if a.onlyServeDns {
a.entry.Info("Only serve DNS: no gsloc consul for api")
return nil
}
a.gslocConsul = disco.NewGslocConsul(a.consulClient)
return nil
}

func (a *App) loadGeoLoc() error {
if a.noServeDns {
a.entry.Info("No dns server: no geoloc loaded")
Expand Down Expand Up @@ -223,7 +261,11 @@ func (a *App) Run() error {
wg.Add(1)
go func() {
defer wg.Done()
grpcServer := servers.NewHTTPServer(a.cnf.HTTPServer, a.hcHandler, a.grpcServer)
grpcServer := servers.NewHTTPServer(
a.cnf.HTTPServer,
a.hcHandler, a.grpcServer,
a.makeMetricsProxy(), proxmetrics.NewStatusCollector(a.gslocConsul),
)
grpcServer.Run(a.ctx)
}()
}
Expand Down
2 changes: 1 addition & 1 deletion app/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (a *App) loadGrpcServer() error {
grpcServer := grpc.NewServer(grpcOptions...)

reflection.Register(grpcServer)
serv, err := gslb.NewServer(a.consulClient)
serv, err := gslb.NewServer(a.consulClient, a.gslocConsul)
if err != nil {
return fmt.Errorf("agent: failed to create gslb server: %v", err)
}
Expand Down
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Config struct {
ConsulConfig *ConsulConfig `yaml:"consul_config"`
HealthCheckConfig *HealthCheckConfig `yaml:"healthcheck_config"`
GeoLoc *GeoLoc `yaml:"geo_loc"`
MetricsConfig *MetricsConfig `yaml:"metrics"`
}

func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
Expand Down Expand Up @@ -79,6 +80,13 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
if c.GeoLoc == nil {
return fmt.Errorf("geo_loc is required")
}
if c.MetricsConfig == nil {
c.MetricsConfig = &MetricsConfig{}
err = c.MetricsConfig.init()
if err != nil {
return err
}
}
return nil
}

Expand Down
17 changes: 17 additions & 0 deletions config/geoloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"github.com/oschwald/geoip2-golang"
"net"
"net/url"
)

type GeoLoc struct {
Expand Down Expand Up @@ -89,3 +90,19 @@ func (c *CIDR) UnmarshalYAML(unmarshal func(interface{}) error) error {
c.IpNet = ipNet
return nil
}

type URLConfig struct {
URL *url.URL
Raw string
}

func (uc *URLConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
var s string
err := unmarshal(&s)
if err != nil {
return err
}
uc.Raw = s
uc.URL, err = url.Parse(s)
return err
}
58 changes: 58 additions & 0 deletions config/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package config

import (
"fmt"
"regexp"
)

var validateNameForPath = regexp.MustCompile(`(\s|_)`).MatchString

type MetricsConfig struct {
ProxyMetricsConfig *ProxyMetricsConfig `yaml:"proxy"`
}

func (u *MetricsConfig) init() error {
if u.ProxyMetricsConfig == nil {
u.ProxyMetricsConfig = &ProxyMetricsConfig{}
err := u.ProxyMetricsConfig.init()
if err != nil {
return err
}
}
return nil
}

type ProxyMetricsConfig struct {
Targets []*ProxyMetricsTarget `yaml:"targets"`
}

func (u *ProxyMetricsConfig) init() error {
if u.Targets == nil {
u.Targets = make([]*ProxyMetricsTarget, 0)
}
return nil
}

type ProxyMetricsTarget struct {
Name string `yaml:"name"`
URL URLConfig `yaml:"url"`
}

func (u *ProxyMetricsTarget) UnmarshalYAML(unmarshal func(interface{}) error) error {
type plain ProxyMetricsTarget
err := unmarshal((*plain)(u))
if err != nil {
return err
}
if u.Name == "" {
return fmt.Errorf("proxy metrics target name is empty")
}
if validateNameForPath(u.Name) {
return fmt.Errorf("proxy metrics target name must not contains whitespace or underscore")
}
if u.URL.Raw == "" {
return fmt.Errorf("proxy metrics target url is empty")
}

return nil
}
187 changes: 187 additions & 0 deletions disco/gsloc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package disco

import (
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-multierror"
"github.com/orange-cloudfoundry/gsloc-go-sdk/gsloc/api/config/entries/v1"
gslbsvc "github.com/orange-cloudfoundry/gsloc-go-sdk/gsloc/services/gslb/v1"
"github.com/orange-cloudfoundry/gsloc/config"
"github.com/samber/lo"
log "github.com/sirupsen/logrus"
"github.com/sourcegraph/conc/pool"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"strings"
"sync"
)

type GslocConsul struct {
consulClient *consul.Client
}

func NewGslocConsul(consulClient *consul.Client) *GslocConsul {
return &GslocConsul{
consulClient: consulClient,
}
}

func (c *GslocConsul) ConvertPairToSignedEntry(pair *consul.KVPair) (*entries.SignedEntry, error) {
if pair == nil {
return nil, status.Errorf(codes.NotFound, "entry not found")
}
signedEntry := &entries.SignedEntry{}
err := protojson.Unmarshal(pair.Value, signedEntry)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to unmarshal entry: %v", err)
}
return signedEntry, nil
}

func (c *GslocConsul) ListEntries(prefix string, tags []string) ([]*entries.SignedEntry, error) {
pairs, _, err := c.consulClient.KV().List(config.ConsulKVEntriesPrefix+prefix, nil)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to list entries: %v", err)
}

ents := make([]*entries.SignedEntry, 0, len(pairs))
for _, pair := range pairs {
signedEntry, err := c.ConvertPairToSignedEntry(pair)
if err != nil {
return nil, err
}
hasTag := true
for _, tag := range tags {
if !lo.Contains[string](signedEntry.GetEntry().GetTags(), tag) {
hasTag = false
break
}
}
if !hasTag {
continue
}
ents = append(ents, signedEntry)
}
return ents, nil
}

func (c *GslocConsul) ListEntriesStatus(prefix string, tags []string) ([]*gslbsvc.GetEntryStatusResponse, error) {
ents, err := c.ListEntries(prefix, tags)
if err != nil {
return nil, err
}
var entsStatus []*gslbsvc.GetEntryStatusResponse
var chanEntsStatus = make(chan *gslbsvc.GetEntryStatusResponse, 100)
p := pool.New().WithMaxGoroutines(10)
done := make(chan struct{})
go func() {
for entStatus := range chanEntsStatus {
entsStatus = append(entsStatus, entStatus)
}
done <- struct{}{}
}()
lockErr := &sync.Mutex{}
var errResult error
for _, ent := range ents {
ent := ent
p.Go(func() {
entStatus, err := c.GetEntryStatus(ent.GetEntry().GetFqdn())
if err != nil {
lockErr.Lock()
errResult = multierror.Append(errResult, err)
lockErr.Unlock()
}
chanEntsStatus <- entStatus
})
}
p.Wait()
close(chanEntsStatus)
<-done
return entsStatus, errResult
}

func (c *GslocConsul) GetEntryStatus(fqdn string) (*gslbsvc.GetEntryStatusResponse, error) {
pair, _, err := c.consulClient.KV().Get(config.ConsulKVEntriesPrefix+fqdn, nil)
if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, status.Errorf(codes.NotFound, "entry not found")
}
return nil, status.Errorf(codes.Internal, "failed to get entry: %v", err)
}

signedEntry, err := c.ConvertPairToSignedEntry(pair)
if err != nil {
return nil, err
}

resp := &gslbsvc.GetEntryStatusResponse{
Fqdn: fqdn,
MembersIpv4: make([]*gslbsvc.MemberStatus, 0),
MembersIpv6: make([]*gslbsvc.MemberStatus, 0),
}
msMap := map[string]*gslbsvc.MemberStatus{}
for _, member := range signedEntry.GetEntry().GetMembersIpv4() {
ms := &gslbsvc.MemberStatus{
Ip: member.GetIp(),
Dc: member.GetDc(),
Status: gslbsvc.MemberStatus_OFFLINE,
FailureReason: "",
}
msMap[fqdn+ms.GetIp()] = ms
resp.MembersIpv4 = append(resp.MembersIpv4, ms)
}
for _, member := range signedEntry.GetEntry().GetMembersIpv6() {
ms := &gslbsvc.MemberStatus{
Ip: member.GetIp(),
Dc: member.GetDc(),
Status: gslbsvc.MemberStatus_OFFLINE,
FailureReason: "",
}
msMap[fqdn+ms.GetIp()] = ms
resp.MembersIpv4 = append(resp.MembersIpv4, ms)
}

ents, _, err := c.consulClient.Health().Service(fqdn, "", false, &consul.QueryOptions{})
if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, status.Errorf(codes.NotFound, "entry not found")
}
return nil, status.Errorf(codes.Internal, "failed to get health: %v", err)
}
for _, ent := range ents {
ms, ok := msMap[fqdn+ent.Service.Address]
if !ok {
continue
}
var check *consul.HealthCheck
for _, c := range ent.Checks {
if c.Type == "http" {
check = c
break
}
}
if check == nil {
log.Warnf("no http check found for %s", ent.Service.Address)
continue
}
if check.Status == consul.HealthPassing {
ms.Status = gslbsvc.MemberStatus_ONLINE
} else {
ms.Status = gslbsvc.MemberStatus_CHECK_FAILED
ms.FailureReason = check.Output
}
}
return resp, nil
}

func (c *GslocConsul) RetrieveSignedEntry(fqdn string) (*entries.SignedEntry, error) {
pair, _, err := c.consulClient.KV().Get(config.ConsulKVEntriesPrefix+fqdn, nil)
if err != nil {
return nil, err
}
signedEntry, err := c.ConvertPairToSignedEntry(pair)
if err != nil {
return nil, err
}
return signedEntry, nil
}
Loading

0 comments on commit 8f1e10c

Please sign in to comment.