diff --git a/app/app.go b/app/app.go index 9cbcf8d..0157540 100644 --- a/app/app.go +++ b/app/app.go @@ -2,6 +2,7 @@ package app import ( "context" + "crypto/tls" "fmt" consul "github.com/hashicorp/consul/api" "github.com/orange-cloudfoundry/gsloc/config" @@ -9,6 +10,7 @@ import ( "github.com/orange-cloudfoundry/gsloc/geolocs" "github.com/orange-cloudfoundry/gsloc/healthchecks" "github.com/orange-cloudfoundry/gsloc/lb" + "github.com/orange-cloudfoundry/gsloc/proxmetrics" "github.com/orange-cloudfoundry/gsloc/regs" "github.com/orange-cloudfoundry/gsloc/resolvers" "github.com/orange-cloudfoundry/gsloc/rets" @@ -16,6 +18,7 @@ import ( log "github.com/sirupsen/logrus" "google.golang.org/grpc" "net" + "net/url" "os" "os/signal" "sync" @@ -44,6 +47,7 @@ type App struct { geoLoc *geolocs.GeoLoc hcHandler *healthchecks.HcHandler grpcServer *grpc.Server + gslocConsul *disco.GslocConsul onlyServeDns bool noServeDns bool } @@ -62,6 +66,10 @@ func NewApp(cnf *config.Config, onlyServeDns, noServeDns bool) (*App, error) { if err != nil { return nil, fmt.Errorf("app loadConsulClient: %w", err) } + err = app.loadGslocConsul() + if err != nil { + return nil, fmt.Errorf("app loadGslocConsul: %w", err) + } err = app.loadConsulDiscoverer() if err != nil { return nil, fmt.Errorf("app loadConsulDiscoverer: %w", err) @@ -140,6 +148,36 @@ func (a *App) loadRetriever() error { return nil } +func (a *App) makeMetricsProxy() *proxmetrics.Fetcher { + rawConsul := fmt.Sprintf("%s://%s/v1/agent/metrics?format=prometheus", a.cnf.ConsulConfig.Scheme, a.cnf.ConsulConfig.Addr) + consulUrl, _ := url.Parse(rawConsul) // nolint + a.cnf.MetricsConfig.ProxyMetricsConfig.Targets = append( + a.cnf.MetricsConfig.ProxyMetricsConfig.Targets, + &config.ProxyMetricsTarget{ + Name: "consul", + URL: config.URLConfig{ + URL: consulUrl, + Raw: rawConsul, + }, + }, + ) + return proxmetrics.NewFetcher( + proxmetrics.NewScraper(&tls.Config{ + InsecureSkipVerify: true, + }), + a.cnf.MetricsConfig.ProxyMetricsConfig.Targets, + ) +} + +func (a *App) loadGslocConsul() error { + if a.onlyServeDns { + a.entry.Info("Only serve DNS: no gsloc consul for api") + return nil + } + a.gslocConsul = disco.NewGslocConsul(a.consulClient) + return nil +} + func (a *App) loadGeoLoc() error { if a.noServeDns { a.entry.Info("No dns server: no geoloc loaded") @@ -223,7 +261,11 @@ func (a *App) Run() error { wg.Add(1) go func() { defer wg.Done() - grpcServer := servers.NewHTTPServer(a.cnf.HTTPServer, a.hcHandler, a.grpcServer) + grpcServer := servers.NewHTTPServer( + a.cnf.HTTPServer, + a.hcHandler, a.grpcServer, + a.makeMetricsProxy(), proxmetrics.NewStatusCollector(a.gslocConsul), + ) grpcServer.Run(a.ctx) }() } diff --git a/app/grpc.go b/app/grpc.go index f75ec34..db4ff6d 100644 --- a/app/grpc.go +++ b/app/grpc.go @@ -83,7 +83,7 @@ func (a *App) loadGrpcServer() error { grpcServer := grpc.NewServer(grpcOptions...) reflection.Register(grpcServer) - serv, err := gslb.NewServer(a.consulClient) + serv, err := gslb.NewServer(a.consulClient, a.gslocConsul) if err != nil { return fmt.Errorf("agent: failed to create gslb server: %v", err) } diff --git a/config/config.go b/config/config.go index fd6e144..fe9bb5b 100644 --- a/config/config.go +++ b/config/config.go @@ -36,6 +36,7 @@ type Config struct { ConsulConfig *ConsulConfig `yaml:"consul_config"` HealthCheckConfig *HealthCheckConfig `yaml:"healthcheck_config"` GeoLoc *GeoLoc `yaml:"geo_loc"` + MetricsConfig *MetricsConfig `yaml:"metrics"` } func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { @@ -79,6 +80,13 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { if c.GeoLoc == nil { return fmt.Errorf("geo_loc is required") } + if c.MetricsConfig == nil { + c.MetricsConfig = &MetricsConfig{} + err = c.MetricsConfig.init() + if err != nil { + return err + } + } return nil } diff --git a/config/geoloc.go b/config/geoloc.go index f999346..533fc7c 100644 --- a/config/geoloc.go +++ b/config/geoloc.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/oschwald/geoip2-golang" "net" + "net/url" ) type GeoLoc struct { @@ -89,3 +90,19 @@ func (c *CIDR) UnmarshalYAML(unmarshal func(interface{}) error) error { c.IpNet = ipNet return nil } + +type URLConfig struct { + URL *url.URL + Raw string +} + +func (uc *URLConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + err := unmarshal(&s) + if err != nil { + return err + } + uc.Raw = s + uc.URL, err = url.Parse(s) + return err +} diff --git a/config/metrics.go b/config/metrics.go new file mode 100644 index 0000000..2c8b699 --- /dev/null +++ b/config/metrics.go @@ -0,0 +1,58 @@ +package config + +import ( + "fmt" + "regexp" +) + +var validateNameForPath = regexp.MustCompile(`(\s|_)`).MatchString + +type MetricsConfig struct { + ProxyMetricsConfig *ProxyMetricsConfig `yaml:"proxy"` +} + +func (u *MetricsConfig) init() error { + if u.ProxyMetricsConfig == nil { + u.ProxyMetricsConfig = &ProxyMetricsConfig{} + err := u.ProxyMetricsConfig.init() + if err != nil { + return err + } + } + return nil +} + +type ProxyMetricsConfig struct { + Targets []*ProxyMetricsTarget `yaml:"targets"` +} + +func (u *ProxyMetricsConfig) init() error { + if u.Targets == nil { + u.Targets = make([]*ProxyMetricsTarget, 0) + } + return nil +} + +type ProxyMetricsTarget struct { + Name string `yaml:"name"` + URL URLConfig `yaml:"url"` +} + +func (u *ProxyMetricsTarget) UnmarshalYAML(unmarshal func(interface{}) error) error { + type plain ProxyMetricsTarget + err := unmarshal((*plain)(u)) + if err != nil { + return err + } + if u.Name == "" { + return fmt.Errorf("proxy metrics target name is empty") + } + if validateNameForPath(u.Name) { + return fmt.Errorf("proxy metrics target name must not contains whitespace or underscore") + } + if u.URL.Raw == "" { + return fmt.Errorf("proxy metrics target url is empty") + } + + return nil +} diff --git a/disco/gsloc.go b/disco/gsloc.go new file mode 100644 index 0000000..48deacf --- /dev/null +++ b/disco/gsloc.go @@ -0,0 +1,187 @@ +package disco + +import ( + consul "github.com/hashicorp/consul/api" + "github.com/hashicorp/go-multierror" + "github.com/orange-cloudfoundry/gsloc-go-sdk/gsloc/api/config/entries/v1" + gslbsvc "github.com/orange-cloudfoundry/gsloc-go-sdk/gsloc/services/gslb/v1" + "github.com/orange-cloudfoundry/gsloc/config" + "github.com/samber/lo" + log "github.com/sirupsen/logrus" + "github.com/sourcegraph/conc/pool" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protojson" + "strings" + "sync" +) + +type GslocConsul struct { + consulClient *consul.Client +} + +func NewGslocConsul(consulClient *consul.Client) *GslocConsul { + return &GslocConsul{ + consulClient: consulClient, + } +} + +func (c *GslocConsul) ConvertPairToSignedEntry(pair *consul.KVPair) (*entries.SignedEntry, error) { + if pair == nil { + return nil, status.Errorf(codes.NotFound, "entry not found") + } + signedEntry := &entries.SignedEntry{} + err := protojson.Unmarshal(pair.Value, signedEntry) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to unmarshal entry: %v", err) + } + return signedEntry, nil +} + +func (c *GslocConsul) ListEntries(prefix string, tags []string) ([]*entries.SignedEntry, error) { + pairs, _, err := c.consulClient.KV().List(config.ConsulKVEntriesPrefix+prefix, nil) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to list entries: %v", err) + } + + ents := make([]*entries.SignedEntry, 0, len(pairs)) + for _, pair := range pairs { + signedEntry, err := c.ConvertPairToSignedEntry(pair) + if err != nil { + return nil, err + } + hasTag := true + for _, tag := range tags { + if !lo.Contains[string](signedEntry.GetEntry().GetTags(), tag) { + hasTag = false + break + } + } + if !hasTag { + continue + } + ents = append(ents, signedEntry) + } + return ents, nil +} + +func (c *GslocConsul) ListEntriesStatus(prefix string, tags []string) ([]*gslbsvc.GetEntryStatusResponse, error) { + ents, err := c.ListEntries(prefix, tags) + if err != nil { + return nil, err + } + var entsStatus []*gslbsvc.GetEntryStatusResponse + var chanEntsStatus = make(chan *gslbsvc.GetEntryStatusResponse, 100) + p := pool.New().WithMaxGoroutines(10) + done := make(chan struct{}) + go func() { + for entStatus := range chanEntsStatus { + entsStatus = append(entsStatus, entStatus) + } + done <- struct{}{} + }() + lockErr := &sync.Mutex{} + var errResult error + for _, ent := range ents { + ent := ent + p.Go(func() { + entStatus, err := c.GetEntryStatus(ent.GetEntry().GetFqdn()) + if err != nil { + lockErr.Lock() + errResult = multierror.Append(errResult, err) + lockErr.Unlock() + } + chanEntsStatus <- entStatus + }) + } + p.Wait() + close(chanEntsStatus) + <-done + return entsStatus, errResult +} + +func (c *GslocConsul) GetEntryStatus(fqdn string) (*gslbsvc.GetEntryStatusResponse, error) { + pair, _, err := c.consulClient.KV().Get(config.ConsulKVEntriesPrefix+fqdn, nil) + if err != nil { + if strings.Contains(err.Error(), "not found") { + return nil, status.Errorf(codes.NotFound, "entry not found") + } + return nil, status.Errorf(codes.Internal, "failed to get entry: %v", err) + } + + signedEntry, err := c.ConvertPairToSignedEntry(pair) + if err != nil { + return nil, err + } + + resp := &gslbsvc.GetEntryStatusResponse{ + Fqdn: fqdn, + MembersIpv4: make([]*gslbsvc.MemberStatus, 0), + MembersIpv6: make([]*gslbsvc.MemberStatus, 0), + } + msMap := map[string]*gslbsvc.MemberStatus{} + for _, member := range signedEntry.GetEntry().GetMembersIpv4() { + ms := &gslbsvc.MemberStatus{ + Ip: member.GetIp(), + Dc: member.GetDc(), + Status: gslbsvc.MemberStatus_OFFLINE, + FailureReason: "", + } + msMap[fqdn+ms.GetIp()] = ms + resp.MembersIpv4 = append(resp.MembersIpv4, ms) + } + for _, member := range signedEntry.GetEntry().GetMembersIpv6() { + ms := &gslbsvc.MemberStatus{ + Ip: member.GetIp(), + Dc: member.GetDc(), + Status: gslbsvc.MemberStatus_OFFLINE, + FailureReason: "", + } + msMap[fqdn+ms.GetIp()] = ms + resp.MembersIpv4 = append(resp.MembersIpv4, ms) + } + + ents, _, err := c.consulClient.Health().Service(fqdn, "", false, &consul.QueryOptions{}) + if err != nil { + if strings.Contains(err.Error(), "not found") { + return nil, status.Errorf(codes.NotFound, "entry not found") + } + return nil, status.Errorf(codes.Internal, "failed to get health: %v", err) + } + for _, ent := range ents { + ms, ok := msMap[fqdn+ent.Service.Address] + if !ok { + continue + } + var check *consul.HealthCheck + for _, c := range ent.Checks { + if c.Type == "http" { + check = c + break + } + } + if check == nil { + log.Warnf("no http check found for %s", ent.Service.Address) + continue + } + if check.Status == consul.HealthPassing { + ms.Status = gslbsvc.MemberStatus_ONLINE + } else { + ms.Status = gslbsvc.MemberStatus_CHECK_FAILED + ms.FailureReason = check.Output + } + } + return resp, nil +} + +func (c *GslocConsul) RetrieveSignedEntry(fqdn string) (*entries.SignedEntry, error) { + pair, _, err := c.consulClient.KV().Get(config.ConsulKVEntriesPrefix+fqdn, nil) + if err != nil { + return nil, err + } + signedEntry, err := c.ConvertPairToSignedEntry(pair) + if err != nil { + return nil, err + } + return signedEntry, nil +} diff --git a/go.mod b/go.mod index 194c057..ac6e968 100644 --- a/go.mod +++ b/go.mod @@ -12,9 +12,12 @@ require ( github.com/miekg/dns v1.1.55 github.com/onsi/ginkgo/v2 v2.12.0 github.com/onsi/gomega v1.27.10 - github.com/orange-cloudfoundry/gsloc-go-sdk v0.3.0 + github.com/orange-cloudfoundry/gsloc-go-sdk v0.4.0 github.com/oschwald/geoip2-golang v1.9.0 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 + github.com/prometheus/client_model v0.3.0 + github.com/prometheus/common v0.42.0 github.com/quic-go/quic-go v0.38.1 github.com/samber/lo v1.38.1 github.com/sirupsen/logrus v1.9.3 @@ -51,8 +54,6 @@ require ( github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/oschwald/maxminddb-golang v1.11.0 // indirect - github.com/prometheus/client_model v0.3.0 // indirect - github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect github.com/quic-go/qpack v0.4.0 // indirect github.com/quic-go/qtls-go1-20 v0.3.3 // indirect diff --git a/go.sum b/go.sum index d212305..91646e5 100644 --- a/go.sum +++ b/go.sum @@ -166,8 +166,8 @@ github.com/onsi/ginkgo/v2 v2.12.0 h1:UIVDowFPwpg6yMUpPjGkYvf06K3RAiJXUhCxEwQVHRI github.com/onsi/ginkgo/v2 v2.12.0/go.mod h1:ZNEzXISYlqpb8S36iN71ifqLi3vVD1rVJGvWRCJOUpQ= github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= -github.com/orange-cloudfoundry/gsloc-go-sdk v0.3.0 h1:NgYINWEci7ckQ4qyVEXTXjtqtpQ/5pPXdk4m/QraRpI= -github.com/orange-cloudfoundry/gsloc-go-sdk v0.3.0/go.mod h1:VdojO3XZ/TArQAXbbcCoOk2q6RTNbAo2X4Eit1vF6jg= +github.com/orange-cloudfoundry/gsloc-go-sdk v0.4.0 h1:Av1LVRMcd6Ko56nUYlF1V5kjZ2Yx8Lilf9Vb6PEnByM= +github.com/orange-cloudfoundry/gsloc-go-sdk v0.4.0/go.mod h1:VdojO3XZ/TArQAXbbcCoOk2q6RTNbAo2X4Eit1vF6jg= github.com/oschwald/geoip2-golang v1.9.0 h1:uvD3O6fXAXs+usU+UGExshpdP13GAqp4GBrzN7IgKZc= github.com/oschwald/geoip2-golang v1.9.0/go.mod h1:BHK6TvDyATVQhKNbQBdrj9eAvuwOMi2zSFXizL3K81Y= github.com/oschwald/maxminddb-golang v1.11.0 h1:aSXMqYR/EPNjGE8epgqwDay+P30hCBZIveY0WZbAWh0= @@ -178,6 +178,7 @@ github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144T github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= diff --git a/gslb/entry.go b/gslb/entry.go index 451b457..e4f39ac 100644 --- a/gslb/entry.go +++ b/gslb/entry.go @@ -8,8 +8,6 @@ import ( gslbsvc "github.com/orange-cloudfoundry/gsloc-go-sdk/gsloc/services/gslb/v1" "github.com/orange-cloudfoundry/gsloc-go-sdk/helpers" "github.com/orange-cloudfoundry/gsloc/config" - "github.com/samber/lo" - log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/encoding/protojson" @@ -45,79 +43,19 @@ func (s *Server) SetEntry(ctx context.Context, request *gslbsvc.SetEntryRequest) return &emptypb.Empty{}, nil } -func (s *Server) GetEntryStatus(ctx context.Context, req *gslbsvc.GetEntryStatusRequest) (*gslbsvc.GetEntryStatusResponse, error) { - fqdn := dns.CanonicalName(req.GetFqdn()) - pair, _, err := s.consulClient.KV().Get(config.ConsulKVEntriesPrefix+fqdn, nil) - if err != nil { - if strings.Contains(err.Error(), "not found") { - return nil, status.Errorf(codes.NotFound, "entry not found") - } - return nil, status.Errorf(codes.Internal, "failed to get entry: %v", err) - } - - signedEntry, err := s.convertPairToSignedEntry(pair) +func (s *Server) ListEntriesStatus(ctx context.Context, req *gslbsvc.ListEntriesStatusRequest) (*gslbsvc.ListEntriesStatusResponse, error) { + allEntriesStatus, err := s.gslocConsul.ListEntriesStatus(req.GetPrefix(), req.GetTags()) if err != nil { return nil, err } + return &gslbsvc.ListEntriesStatusResponse{ + EntriesStatus: allEntriesStatus, + }, nil +} - resp := &gslbsvc.GetEntryStatusResponse{ - Fqdn: fqdn, - MembersIpv4: make([]*gslbsvc.MemberStatus, 0), - MembersIpv6: make([]*gslbsvc.MemberStatus, 0), - } - msMap := map[string]*gslbsvc.MemberStatus{} - for _, member := range signedEntry.GetEntry().GetMembersIpv4() { - ms := &gslbsvc.MemberStatus{ - Ip: member.GetIp(), - Dc: member.GetDc(), - Status: gslbsvc.MemberStatus_OFFLINE, - FailureReason: "", - } - msMap[fqdn+ms.GetIp()] = ms - resp.MembersIpv4 = append(resp.MembersIpv4, ms) - } - for _, member := range signedEntry.GetEntry().GetMembersIpv6() { - ms := &gslbsvc.MemberStatus{ - Ip: member.GetIp(), - Dc: member.GetDc(), - Status: gslbsvc.MemberStatus_OFFLINE, - FailureReason: "", - } - msMap[fqdn+ms.GetIp()] = ms - resp.MembersIpv4 = append(resp.MembersIpv4, ms) - } - - ents, _, err := s.consulClient.Health().Service(fqdn, "", false, &consul.QueryOptions{}) - if err != nil { - if strings.Contains(err.Error(), "not found") { - return nil, status.Errorf(codes.NotFound, "entry not found") - } - return nil, status.Errorf(codes.Internal, "failed to get health: %v", err) - } - for _, ent := range ents { - ms, ok := msMap[fqdn+ent.Service.Address] - if !ok { - continue - } - var check *consul.HealthCheck - for _, c := range ent.Checks { - if c.Type == "http" { - check = c - break - } - } - if check == nil { - log.Warnf("no http check found for %s", ent.Service.Address) - continue - } - if check.Status == consul.HealthPassing { - ms.Status = gslbsvc.MemberStatus_ONLINE - } else { - ms.Status = gslbsvc.MemberStatus_CHECK_FAILED - ms.FailureReason = check.Output - } - } - return resp, nil +func (s *Server) GetEntryStatus(ctx context.Context, req *gslbsvc.GetEntryStatusRequest) (*gslbsvc.GetEntryStatusResponse, error) { + fqdn := dns.CanonicalName(req.GetFqdn()) + return s.gslocConsul.GetEntryStatus(fqdn) } func (s *Server) setSignedEntry(entry *entries.SignedEntry) error { @@ -191,7 +129,7 @@ func (s *Server) GetEntry(ctx context.Context, request *gslbsvc.GetEntryRequest) return nil, status.Errorf(codes.Internal, "failed to get entry: %v", err) } - signedEntry, err := s.convertPairToSignedEntry(pair) + signedEntry, err := s.gslocConsul.ConvertPairToSignedEntry(pair) if err != nil { return nil, err } @@ -216,7 +154,7 @@ func (s *Server) GetEntryWithStatus(ctx context.Context, request *gslbsvc.GetEnt return nil, status.Errorf(codes.Internal, "failed to get entry: %v", err) } - signedEntry, err := s.convertPairToSignedEntry(pair) + signedEntry, err := s.gslocConsul.ConvertPairToSignedEntry(pair) if err != nil { return nil, err } @@ -226,24 +164,12 @@ func (s *Server) GetEntryWithStatus(ctx context.Context, request *gslbsvc.GetEnt }, nil } -func (s *Server) convertPairToSignedEntry(pair *consul.KVPair) (*entries.SignedEntry, error) { - if pair == nil { - return nil, status.Errorf(codes.NotFound, "entry not found") - } - signedEntry := &entries.SignedEntry{} - err := protojson.Unmarshal(pair.Value, signedEntry) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to unmarshal entry: %v", err) - } - return signedEntry, nil -} - func (s *Server) ListEntries(ctx context.Context, request *gslbsvc.ListEntriesRequest) (*gslbsvc.ListEntriesResponse, error) { err := request.ValidateAll() if err != nil { return nil, status.Errorf(codes.InvalidArgument, "invalid request: %v", err) } - signedEnts, err := s.listEntries(request.GetPrefix(), request.GetTags()) + signedEnts, err := s.gslocConsul.ListEntries(request.GetPrefix(), request.GetTags()) if err != nil { return nil, err } @@ -258,42 +184,3 @@ func (s *Server) ListEntries(ctx context.Context, request *gslbsvc.ListEntriesRe Entries: ents, }, nil } - -func (s *Server) listEntries(prefix string, tags []string) ([]*entries.SignedEntry, error) { - pairs, _, err := s.consulClient.KV().List(config.ConsulKVEntriesPrefix+prefix, nil) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to list entries: %v", err) - } - - ents := make([]*entries.SignedEntry, 0, len(pairs)) - for _, pair := range pairs { - signedEntry, err := s.convertPairToSignedEntry(pair) - if err != nil { - return nil, err - } - hasTag := true - for _, tag := range tags { - if !lo.Contains[string](signedEntry.GetEntry().GetTags(), tag) { - hasTag = false - break - } - } - if !hasTag { - continue - } - ents = append(ents, signedEntry) - } - return ents, nil -} - -func (s *Server) retrieveSignedEntry(fqdn string) (*entries.SignedEntry, error) { - pair, _, err := s.consulClient.KV().Get(config.ConsulKVEntriesPrefix+fqdn, nil) - if err != nil { - return nil, err - } - signedEntry, err := s.convertPairToSignedEntry(pair) - if err != nil { - return nil, err - } - return signedEntry, nil -} diff --git a/gslb/healthcheck.go b/gslb/healthcheck.go index 8dfbb15..988cd66 100644 --- a/gslb/healthcheck.go +++ b/gslb/healthcheck.go @@ -16,7 +16,7 @@ func (s *Server) SetHealthCheck(ctx context.Context, request *gslbsvc.SetHealthC } fqdn := dns.CanonicalName(request.GetFqdn()) - signedEntry, err := s.retrieveSignedEntry(fqdn) + signedEntry, err := s.gslocConsul.RetrieveSignedEntry(fqdn) if err != nil { return nil, err } @@ -36,7 +36,7 @@ func (s *Server) GetHealthCheck(ctx context.Context, request *gslbsvc.GetHealthC } fqdn := dns.CanonicalName(request.GetFqdn()) - signedEntry, err := s.retrieveSignedEntry(fqdn) + signedEntry, err := s.gslocConsul.RetrieveSignedEntry(fqdn) if err != nil { return nil, err } diff --git a/gslb/member.go b/gslb/member.go index 06c4fc4..c2d8160 100644 --- a/gslb/member.go +++ b/gslb/member.go @@ -33,7 +33,7 @@ func (s *Server) SetMember(ctx context.Context, request *gslbsvc.SetMemberReques fqdn := dns.CanonicalName(request.GetFqdn()) - signedEntry, err := s.retrieveSignedEntry(fqdn) + signedEntry, err := s.gslocConsul.RetrieveSignedEntry(fqdn) if err != nil { return nil, err } @@ -68,7 +68,7 @@ func (s *Server) DeleteMember(ctx context.Context, request *gslbsvc.DeleteMember fqdn := dns.CanonicalName(request.GetFqdn()) - signedEntry, err := s.retrieveSignedEntry(fqdn) + signedEntry, err := s.gslocConsul.RetrieveSignedEntry(fqdn) if err != nil { return nil, err } @@ -120,7 +120,7 @@ func (s *Server) SetMembersStatus(ctx context.Context, request *gslbsvc.SetMembe return nil, status.Errorf(codes.InvalidArgument, "invalid request: %v", err) } - signedEnts, err := s.listEntries(request.Prefix, request.Tags) + signedEnts, err := s.gslocConsul.ListEntries(request.Prefix, request.Tags) if err != nil { return nil, err } @@ -195,7 +195,7 @@ func (s *Server) GetMember(ctx context.Context, request *gslbsvc.GetMemberReques fqdn := dns.CanonicalName(request.GetFqdn()) - signedEntry, err := s.retrieveSignedEntry(fqdn) + signedEntry, err := s.gslocConsul.RetrieveSignedEntry(fqdn) if err != nil { return nil, err } @@ -222,7 +222,7 @@ func (s *Server) ListMembers(ctx context.Context, request *gslbsvc.ListMembersRe fqdn := dns.CanonicalName(request.GetFqdn()) - signedEntry, err := s.retrieveSignedEntry(fqdn) + signedEntry, err := s.gslocConsul.RetrieveSignedEntry(fqdn) if err != nil { return nil, err } diff --git a/gslb/server.go b/gslb/server.go index 3b72c8a..586ca35 100644 --- a/gslb/server.go +++ b/gslb/server.go @@ -3,16 +3,19 @@ package gslb import ( consul "github.com/hashicorp/consul/api" gslbsvc "github.com/orange-cloudfoundry/gsloc-go-sdk/gsloc/services/gslb/v1" + "github.com/orange-cloudfoundry/gsloc/disco" ) type Server struct { consulClient *consul.Client + gslocConsul *disco.GslocConsul gslbsvc.UnimplementedGSLBServer } -func NewServer(consulClient *consul.Client) (*Server, error) { +func NewServer(consulClient *consul.Client, gslocConsul *disco.GslocConsul) (*Server, error) { s := &Server{ consulClient: consulClient, + gslocConsul: gslocConsul, } return s, nil } diff --git a/proxmetrics/errors.go b/proxmetrics/errors.go new file mode 100644 index 0000000..07990d9 --- /dev/null +++ b/proxmetrics/errors.go @@ -0,0 +1,36 @@ +package proxmetrics + +import ( + "fmt" + "net/http" + "net/url" +) + +func ErrNoInstanceFound(instanceUuid string) *ErrFetch { + return &ErrFetch{ + Code: http.StatusNotFound, + Message: "Cannot found instance " + instanceUuid, + } +} + +func ErrNoEndpointFound(appIdOrPath, endpoint string) *ErrFetch { + appIdOrPathTmp, err := url.PathUnescape(appIdOrPath) + if err == nil { + appIdOrPath = appIdOrPathTmp + } + return &ErrFetch{ + Code: http.StatusNotAcceptable, + Message: fmt.Sprintf( + "Cannot found endpoint '%s' for app with id or path '%s', please create one", endpoint, appIdOrPath, + ), + } +} + +type ErrFetch struct { + Code int + Message string +} + +func (e ErrFetch) Error() string { + return fmt.Sprintf("%d %s\n", e.Code, e.Message) +} diff --git a/proxmetrics/fetcher.go b/proxmetrics/fetcher.go new file mode 100644 index 0000000..92302b6 --- /dev/null +++ b/proxmetrics/fetcher.go @@ -0,0 +1,198 @@ +package proxmetrics + +import ( + "github.com/orange-cloudfoundry/gsloc/config" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus/promhttp" + log "github.com/sirupsen/logrus" + "net/http" + "sync" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" +) + +func ptrString(v string) *string { + return &v +} + +type Fetcher struct { + scraper *Scraper + targets []*config.ProxyMetricsTarget +} + +func NewFetcher(scraper *Scraper, targets []*config.ProxyMetricsTarget) *Fetcher { + return &Fetcher{ + scraper: scraper, + targets: targets, + } +} + +func (f Fetcher) ServeHTTP(w http.ResponseWriter, req *http.Request) { + promhttp.HandlerFor(f, promhttp.HandlerOpts{}).ServeHTTP(w, req) +} + +func (f Fetcher) Gather() ([]*dto.MetricFamily, error) { + jobs := make(chan *config.ProxyMetricsTarget, len(f.targets)) + errFetch := &ErrFetch{} + wg := &sync.WaitGroup{} + + muWrite := sync.Mutex{} + metricsUnmerged := make([]*dto.MetricFamily, 0) + + wg.Add(len(f.targets)) + for w := 1; w <= 5; w++ { + go func(jobs <-chan *config.ProxyMetricsTarget, errFetch *ErrFetch) { + for j := range jobs { + newMetrics, err := f.Metric(j) + if err != nil { + if errF, ok := err.(*ErrFetch); ok { + muWrite.Lock() + *errFetch = *errF + muWrite.Unlock() + wg.Done() + continue + } + log.Warnf("Cannot get metric for target %s", j.Name) + newMetrics = f.scrapeError(j, err) + } + muWrite.Lock() + metricsUnmerged = append(metricsUnmerged, newMetrics...) + muWrite.Unlock() + wg.Done() + } + }(jobs, errFetch) + } + for _, target := range f.targets { + jobs <- target + } + wg.Wait() + close(jobs) + if errFetch.Code != 0 { + return make([]*dto.MetricFamily, 0), errFetch + } + gat := prometheus.ToTransactionalGatherer(prometheus.DefaultGatherer) + mfs, done, err := gat.Gather() + defer done() + if err != nil { + return nil, errors.Wrap(err, "cannot gather prometheus metrics") + } + for _, mf := range mfs { + f.addTargetName(mf, "gsloc") + } + metricsUnmerged = append(metricsUnmerged, mfs...) + + if len(metricsUnmerged) == 0 { + return make([]*dto.MetricFamily, 0), nil + } + + return f.cleanDuplicate(metricsUnmerged), nil +} + +func (f Fetcher) cleanDuplicate(mfs []*dto.MetricFamily) []*dto.MetricFamily { + mfMap := make(map[string]*dto.MetricFamily) + for _, mf := range mfs { + if _, ok := mfMap[mf.GetName()]; !ok { + mfMap[mf.GetName()] = &dto.MetricFamily{ + Name: mf.Name, + Help: mf.Help, + Type: mf.Type, + Metric: mf.Metric, + } + continue + } + elem := mfMap[mf.GetName()] + if elem.Help == nil { + elem.Help = mf.Help + } + if elem.Type == nil { + elem.Type = mf.Type + } + elem.Metric = append(mfMap[mf.GetName()].Metric, mf.Metric...) + } + mfs = make([]*dto.MetricFamily, 0) + for _, mf := range mfMap { + mfs = append(mfs, mf) + } + return mfs +} + +func (f Fetcher) Metric(target *config.ProxyMetricsTarget) ([]*dto.MetricFamily, error) { + reader, err := f.scraper.Scrape(target) + if err != nil { + return nil, err + } + defer reader.Close() + parser := &expfmt.TextParser{} + metricsGroup, err := parser.TextToMetricFamilies(reader) + if err != nil { + return nil, err + } + + for _, metricGroup := range metricsGroup { + f.addTargetName(metricGroup, target.Name) + } + finalMetrics := make([]*dto.MetricFamily, len(metricsGroup)) + i := 0 + for _, metricGroup := range metricsGroup { + finalMetrics[i] = metricGroup + i++ + } + return finalMetrics, nil +} + +func (f Fetcher) addTargetName(mf *dto.MetricFamily, targetName string) { + for _, metric := range mf.Metric { + metric.Label = f.cleanMetricLabels( + metric.Label, + "target", + ) + metric.Label = append(metric.Label, &dto.LabelPair{ + Name: ptrString("target"), + Value: ptrString(targetName), + }) + } +} + +func (f Fetcher) cleanMetricLabels(labels []*dto.LabelPair, names ...string) []*dto.LabelPair { + finalLabels := make([]*dto.LabelPair, 0) + for _, label := range labels { + toAdd := true + for _, name := range names { + if label.Name != nil && *label.Name == name { + toAdd = false + break + } + } + if toAdd { + finalLabels = append(finalLabels, label) + } + } + return finalLabels +} + +func (f Fetcher) scrapeError(target *config.ProxyMetricsTarget, err error) []*dto.MetricFamily { + name := "gsloc_proxmetrics_scrape_error" + help := "Gsloc proxy metrics scrap error on one agent" + metric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: name, + Help: help, + ConstLabels: prometheus.Labels{ + "target": target.Name, + "error": err.Error(), + }, + }) + metric.Inc() + var dtoMetric dto.Metric + metric.Write(&dtoMetric) // nolint: errcheck + metricType := dto.MetricType_COUNTER + return []*dto.MetricFamily{ + { + Name: ptrString(name), + Help: ptrString(help), + Type: &metricType, + Metric: []*dto.Metric{&dtoMetric}, + }, + } +} diff --git a/proxmetrics/scrapper.go b/proxmetrics/scrapper.go new file mode 100644 index 0000000..e27fb41 --- /dev/null +++ b/proxmetrics/scrapper.go @@ -0,0 +1,106 @@ +package proxmetrics + +import ( + "compress/gzip" + "crypto/tls" + "fmt" + "github.com/orange-cloudfoundry/gsloc/config" + "io" + "net" + "net/http" + "time" +) + +const acceptHeader = `application/openmetrics-text; version=0.0.1,text/plain;version=0.0.4;q=0.5,*/*;q=0.1` + +type Scraper struct { + httpClient *http.Client +} + +func NewScraper(tlsConf *tls.Config) *Scraper { + dialer := &net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + } + httpClient := &http.Client{ + Timeout: time.Second * 10, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: dialer.DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + TLSClientConfig: tlsConf, + }, + } + return &Scraper{ + httpClient: httpClient, + } + +} +func (s Scraper) Scrape(target *config.ProxyMetricsTarget) (io.ReadCloser, error) { + req, err := http.NewRequest("GET", target.URL.URL.String(), nil) + if err != nil { + return nil, err + } + req.Header.Add("Accept", acceptHeader) + req.Header.Add("Accept-Encoding", "gzip") + req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", (30*time.Second).Seconds())) + req.Header.Set("X-Proxy-Scrapping", "true") + resp, err := s.httpClient.Do(req) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + if resp.StatusCode >= 400 && resp.StatusCode <= 499 { + return nil, ErrNoEndpointFound( + fmt.Sprintf( + "Target %s (status code %d)", + target.Name, + resp.StatusCode, + ), target.URL.URL.String(), + ) + } + return nil, fmt.Errorf("server returned HTTP status %s", resp.Status) + } + + if resp.Header.Get("Content-Encoding") != "gzip" { + return resp.Body, nil + } + gzReader, err := NewReaderGzip(resp.Body) + if err != nil { + resp.Body.Close() + return nil, err + } + return gzReader, nil +} + +type ReaderGzip struct { + main io.ReadCloser + gzip *gzip.Reader +} + +func NewReaderGzip(main io.ReadCloser) (*ReaderGzip, error) { + gzReader, err := gzip.NewReader(main) + if err != nil { + return nil, err + } + return &ReaderGzip{ + main: main, + gzip: gzReader, + }, nil +} + +func (r ReaderGzip) Read(p []byte) (n int, err error) { + return r.gzip.Read(p) +} + +func (r ReaderGzip) Close() error { + r.gzip.Close() + r.main.Close() + return nil +} diff --git a/proxmetrics/status.go b/proxmetrics/status.go new file mode 100644 index 0000000..3ced6e6 --- /dev/null +++ b/proxmetrics/status.go @@ -0,0 +1,80 @@ +package proxmetrics + +import ( + gslbsvc "github.com/orange-cloudfoundry/gsloc-go-sdk/gsloc/services/gslb/v1" + "github.com/orange-cloudfoundry/gsloc/disco" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + log "github.com/sirupsen/logrus" + "net/http" +) + +type StatusCollector struct { + desc *prometheus.Desc + gslocConsul *disco.GslocConsul +} + +func StatusHandler(collector *StatusCollector) http.Handler { + registry := prometheus.NewRegistry() + registry.MustRegister(collector) + return promhttp.InstrumentMetricHandler( + registry, promhttp.HandlerFor(registry, promhttp.HandlerOpts{}), + ) +} + +func NewStatusCollector(gslocConsul *disco.GslocConsul) *StatusCollector { + return &StatusCollector{ + gslocConsul: gslocConsul, + desc: prometheus.NewDesc( + "gsloc_entry_status", + "Get entry status information. 0 means online, 1 means check failed, 2 means offline (disabled by user), 3 means unknown.", + []string{"fqdn", "member_ip", "dc", "type"}, + nil, + ), + } +} + +func (s *StatusCollector) Describe(descs chan<- *prometheus.Desc) { + descs <- s.desc +} + +func (s *StatusCollector) Collect(metrics chan<- prometheus.Metric) { + entriesStatus, err := s.gslocConsul.ListEntriesStatus("", []string{}) + if err != nil { + log.WithError(err).Error("Failed to list entries status") + return + } + for _, entryStatus := range entriesStatus { + for _, entry := range entryStatus.MembersIpv4 { + metrics <- s.memberStatusToProm(entryStatus.Fqdn, entry, true) + } + for _, entry := range entryStatus.MembersIpv6 { + metrics <- s.memberStatusToProm(entryStatus.Fqdn, entry, false) + } + } +} + +func (s *StatusCollector) memberStatusToProm(fqdn string, ms *gslbsvc.MemberStatus, isIpv4 bool) prometheus.Metric { + val := 3 + switch ms.GetStatus() { + case gslbsvc.MemberStatus_ONLINE: + val = 0 + case gslbsvc.MemberStatus_OFFLINE: + val = 2 + case gslbsvc.MemberStatus_CHECK_FAILED: + val = 1 + } + typeMember := "ipv6" + if isIpv4 { + typeMember = "ipv4" + } + return prometheus.MustNewConstMetric( + s.desc, + prometheus.GaugeValue, + float64(val), + fqdn, + ms.GetIp(), + ms.GetDc(), + typeMember, + ) +} diff --git a/resolvers/handler.go b/resolvers/handler.go index aafb6f4..01a0af5 100644 --- a/resolvers/handler.go +++ b/resolvers/handler.go @@ -134,6 +134,9 @@ func (h *GSLBHandler) Resolve(ctx context.Context, fqdn string, queryType uint16 log.Errorf("error finding members: %s", err.Error()) return []dns.RR{} } + if len(members) == 0 { + return []dns.RR{} + } rrs := make([]dns.RR, 0) for _, member := range members { rr, err := dns.NewRR( diff --git a/servers/http.go b/servers/http.go index 2279ee6..2dcf3ca 100644 --- a/servers/http.go +++ b/servers/http.go @@ -5,29 +5,39 @@ import ( "fmt" "github.com/orange-cloudfoundry/gsloc/config" "github.com/orange-cloudfoundry/gsloc/healthchecks" + "github.com/orange-cloudfoundry/gsloc/proxmetrics" "google.golang.org/grpc" "net/http" "strings" "time" "github.com/gorilla/mux" - "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" ) type HTTPServer struct { - mux *mux.Router - cnf *config.HTTPServerConfig - hcker *healthchecks.HcHandler - grpcServ *grpc.Server + mux *mux.Router + cnf *config.HTTPServerConfig + hcker *healthchecks.HcHandler + grpcServ *grpc.Server + metricsFetcher *proxmetrics.Fetcher + statusCollector *proxmetrics.StatusCollector } -func NewHTTPServer(cnf *config.HTTPServerConfig, hcker *healthchecks.HcHandler, grpcServ *grpc.Server) *HTTPServer { +func NewHTTPServer( + cnf *config.HTTPServerConfig, + hcker *healthchecks.HcHandler, + grpcServ *grpc.Server, + metricsFetcher *proxmetrics.Fetcher, + statusCollector *proxmetrics.StatusCollector, +) *HTTPServer { return &HTTPServer{ - mux: mux.NewRouter(), - cnf: cnf, - hcker: hcker, - grpcServ: grpcServ, + mux: mux.NewRouter(), + cnf: cnf, + hcker: hcker, + grpcServ: grpcServ, + metricsFetcher: metricsFetcher, + statusCollector: statusCollector, } } @@ -44,7 +54,8 @@ func (s *HTTPServer) ServeHTTP(writer http.ResponseWriter, request *http.Request } func (s *HTTPServer) Run(ctx context.Context) { - s.mux.Path("/metrics").Handler(promhttp.Handler()) + s.mux.Path("/metrics").Handler(s.metricsFetcher) + s.mux.Path("/metrics/status").Handler(proxmetrics.StatusHandler(s.statusCollector)) s.mux.Methods("POST").Path("/hc/{fqdn}/member/{ip}").Handler(s.hcker) srvTls := &http.Server{