Skip to content

Commit

Permalink
Cherry picks for v0.40.5 (#6907)
Browse files Browse the repository at this point in the history
* azure_exporter: Fix bug which prevents subscription scope from working with only a single region (#6719)

* Update postgres exporter (#6780)

* Update postgres exporter
* Make collector work with first DSN
* Enhance docs and test
* Update depcheck

* Update SNMP exporter (#6904)

* Update SNMP exporter

* Fix build issues

* Update Windows exporter

* Update Loki dependency (#6905)

* Update Loki dependency.

* Clean up metrics generated from logs after a config reload.

Synced from:
grafana/loki#12938

* Update README

* Update changelog

* Update loki dep to k190 (#6543)

* Update loki dep to k190

* port renames

* fix loki push api

* Update vmware_exporter

* Update prom encoder

---------

Co-authored-by: kgeckhart <[email protected]>
Co-authored-by: Piotr <[email protected]>
  • Loading branch information
3 people authored May 15, 2024
1 parent 7bb820b commit 4a1c26f
Show file tree
Hide file tree
Showing 36 changed files with 482 additions and 238 deletions.
2 changes: 1 addition & 1 deletion .github/depcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ github_repos:
- github.com/google/dnsmasq_exporter v0.2.0
- github.com/ncabatoff/process-exporter v0.7.5
- github.com/prometheus/mysqld_exporter v0.13.0
- github.com/prometheus-community/postgres_exporter v0.10.0
- github.com/prometheus-community/postgres_exporter v0.15.0
- github.com/prometheus-community/windows_exporter v0.16.0
- github.com/percona/mongodb_exporter v0.20.7
- project: github.com/prometheus/prometheus
Expand Down
30 changes: 30 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,36 @@ This document contains a historical list of changes between releases. Only
changes that impact end-user behavior are listed; changes to documentation or
internal API changes are not present.

v0.40.5 (2024-04-15)
--------------------

### Breaking changes

- `prometheus.exporter.postgres` has been updated to the latest upstream
version which changes the set of exported metrics. The following metrics were
removed: `pg_stat_database_session_time`, `pg_stat_database_sessions`,
`pg_stat_database_sessions_abandoned`, `pg_stat_database_sessions_fatal`,
`pg_stat_database_sessions_killed`, `pg_stat_database_idle_in_transaction_time`,
`pg_stat_database_checksum_failures`, `pg_stat_database_checksum_last_failure`,
`pg_stat_database_active_time`. The following metrics were
renamed: `pg_stat_bgwriter_buffers_alloc`, `pg_stat_bgwriter_buffers_backend`,
`pg_stat_bgwriter_buffers_backend_fsync`, `pg_stat_bgwriter_buffers_checkpoint`,
`pg_stat_bgwriter_buffers_clean`, `pg_stat_bgwriter_checkpoint_sync_time`,
`pg_stat_bgwriter_checkpoint_write_time`, `pg_stat_bgwriter_checkpoints_req`,
`pg_stat_bgwriter_checkpoints_timed`, `pg_stat_bgwriter_maxwritten_clean`,
`pg_stat_bgwriter_stats_reset` - the new names include the `_total` suffix. (@thampiotr)

### Bugfixes

- Fix an issue where the azure exporter was not correctly gathering subscription scoped metrics when only one region was configured (@kgeckhart)

- Fixed an issue where creating a `prometheus.exporter.postgres` component with
multiple `data_source_names` would result in an error. (@thampiotr)

- Fix a bug with the logs pipeline in static mode which prevented it from shutting down cleanly.

- Updating SNMP exporter from v0.24.1 to v0.26.0.

v0.40.4 (2024-04-12)
--------------------

Expand Down
4 changes: 2 additions & 2 deletions component/common/net/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func (g *GRPCConfig) Into(c *dskit.Config) {
c.GRPCServerMaxConnectionAge = g.MaxConnectionAge
c.GRPCServerMaxConnectionAgeGrace = g.MaxConnectionAgeGrace
c.GRPCServerMaxConnectionIdle = g.MaxConnectionIdle
c.GPRCServerMaxRecvMsgSize = g.ServerMaxRecvMsg
c.GRPCServerMaxRecvMsgSize = g.ServerMaxRecvMsg
c.GRPCServerMaxSendMsgSize = g.ServerMaxSendMsg
c.GPRCServerMaxConcurrentStreams = g.ServerMaxConcurrentStreams
c.GRPCServerMaxConcurrentStreams = g.ServerMaxConcurrentStreams
}

// Convert converts the River-based ServerConfig into a dskit.Config object.
Expand Down
8 changes: 4 additions & 4 deletions component/common/net/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestConfig(t *testing.T) {
require.Equal(t, time.Second*30, config.ServerGracefulShutdownTimeout)

require.Equal(t, size4MB, config.GRPCServerMaxSendMsgSize)
require.Equal(t, size4MB, config.GPRCServerMaxRecvMsgSize)
require.Equal(t, size4MB, config.GRPCServerMaxRecvMsgSize)
},
},
"overriding defaults": {
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestConfig(t *testing.T) {
require.Equal(t, "0.0.0.0", config.GRPCListenAddress)
require.Equal(t, 10, config.GRPCServerMaxSendMsgSize)
// this should have the default applied
require.Equal(t, size4MB, config.GPRCServerMaxRecvMsgSize)
require.Equal(t, size4MB, config.GRPCServerMaxRecvMsgSize)

require.Equal(t, time.Minute, config.ServerGracefulShutdownTimeout)
},
Expand Down Expand Up @@ -141,9 +141,9 @@ func TestConfig(t *testing.T) {
require.Equal(t, 5*time.Minute, config.GRPCServerMaxConnectionAge)
require.Equal(t, 6*time.Minute, config.GRPCServerMaxConnectionAgeGrace)
require.Equal(t, 7*time.Minute, config.GRPCServerMaxConnectionIdle)
require.Equal(t, 5, config.GPRCServerMaxRecvMsgSize)
require.Equal(t, 5, config.GRPCServerMaxRecvMsgSize)
require.Equal(t, 6, config.GRPCServerMaxSendMsgSize)
require.Equal(t, uint(7), config.GPRCServerMaxConcurrentStreams)
require.Equal(t, uint(7), config.GRPCServerMaxConcurrentStreams)
},
},
}
Expand Down
6 changes: 6 additions & 0 deletions component/loki/process/metric/metricvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool {
return ok
}

func (c *metricVec) DeleteAll() {
c.mtx.Lock()
defer c.mtx.Unlock()
c.metrics = map[model.Fingerprint]prometheus.Metric{}
}

// prune will remove all metrics which implement the Expirable interface and have expired
// it does not take out a lock on the metrics map so whoever calls this function should do so.
func (c *metricVec) prune() {
Expand Down
5 changes: 5 additions & 0 deletions component/loki/process/stages/decolorize.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry {
func (m *decolorizeStage) Name() string {
return StageTypeDecolorize
}

// Cleanup implements Stage.
func (*decolorizeStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions component/loki/process/stages/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,8 @@ func splitSource(s string) []string {
func (m *dropStage) Name() string {
return StageTypeDrop
}

// Cleanup implements Stage.
func (*dropStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions component/loki/process/stages/eventlogmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func (m *eventLogMessageStage) Name() string {
return StageTypeEventLogMessage
}

// Cleanup implements Stage.
func (*eventLogMessageStage) Cleanup() {
// no-op
}

// Sanitize a input string to convert it into a valid prometheus label
// TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName
func SanitizeFullLabelName(input string) string {
Expand Down
5 changes: 5 additions & 0 deletions component/loki/process/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func (c *cri) Name() string {
return StageTypeCRI
}

// Cleanup implements Stage.
func (*cri) Cleanup() {
// no-op
}

// implements Stage interface
func (c *cri) Run(entry chan Entry) chan Entry {
entry = c.base.Run(entry)
Expand Down
5 changes: 5 additions & 0 deletions component/loki/process/stages/geoip.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ func (g *geoIPStage) Name() string {
return StageTypeGeoIP
}

// Cleanup implements Stage.
func (*geoIPStage) Cleanup() {
// no-op
}

func (g *geoIPStage) process(_ model.LabelSet, extracted map[string]interface{}) {
var ip net.IP
if g.cfgs.Source != nil {
Expand Down
5 changes: 5 additions & 0 deletions component/loki/process/stages/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string
func (j *jsonStage) Name() string {
return StageTypeJSON
}

// Cleanup implements Stage.
func (*jsonStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions component/loki/process/stages/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func (m *limitStage) Name() string {
return StageTypeLimit
}

// Cleanup implements Stage.
func (*limitStage) Cleanup() {
// no-op
}

func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec {
return registerCounterVec(registerer, "loki_process", "dropped_lines_by_label_total",
"A count of all log lines dropped as a result of a pipeline stage",
Expand Down
5 changes: 5 additions & 0 deletions component/loki/process/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) {
func (m *matcherStage) Name() string {
return StageTypeMatch
}

// Cleanup implements Stage.
func (*matcherStage) Cleanup() {
// no-op
}
31 changes: 29 additions & 2 deletions component/loki/process/stages/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func newMetricStage(logger log.Logger, config MetricsConfig, registry prometheus
return nil, fmt.Errorf("undefined stage type in '%v', exiting", cfg)
}
}
return toStage(&metricStage{
return &metricStage{
logger: logger,
cfg: config,
metrics: metrics,
}), nil
}, nil
}

// metricStage creates and updates prometheus metrics based on extracted pipeline data
Expand All @@ -118,6 +118,19 @@ type metricStage struct {
metrics map[string]cfgCollector
}

func (m *metricStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)

for e := range in {
m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line)
out <- e
}
}()
return out
}

// Process implements Stage
func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for name, cc := range m.metrics {
Expand Down Expand Up @@ -162,6 +175,20 @@ func (m *metricStage) Name() string {
return StageTypeMetric
}

// Cleanup implements Stage.
func (m *metricStage) Cleanup() {
for _, cfgCollector := range m.metrics {
switch vec := cfgCollector.collector.(type) {
case *metric.Counters:
vec.DeleteAll()
case *metric.Gauges:
vec.DeleteAll()
case *metric.Histograms:
vec.DeleteAll()
}
}
}

// recordCounter will update a counter metric
func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) {
// If value matching is defined, make sure value matches.
Expand Down
7 changes: 7 additions & 0 deletions component/loki/process/stages/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ func TestMetricsPipeline(t *testing.T) {
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

pl.Cleanup()

if err := testutil.GatherAndCompare(registry,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}
}

func TestNegativeGauge(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions component/loki/process/stages/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) {
func (m *multilineStage) Name() string {
return StageTypeMultiline
}

// Cleanup implements Stage.
func (*multilineStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions component/loki/process/stages/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,8 @@ func (m *packStage) pack(e Entry) Entry {
func (m *packStage) Name() string {
return StageTypePack
}

// Cleanup implements Stage.
func (*packStage) Cleanup() {
// no-op
}
8 changes: 8 additions & 0 deletions component/loki/process/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ func (p *Pipeline) Name() string {
return StageTypePipeline
}

// Cleanup implements Stage.
func (p *Pipeline) Cleanup() {
for _, s := range p.stages {
s.Cleanup()
}
}

// Wrap implements EntryMiddleware
func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler {
handlerIn := make(chan loki.Entry)
Expand Down Expand Up @@ -180,6 +187,7 @@ func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler {
return loki.NewEntryHandler(handlerIn, func() {
once.Do(func() { close(handlerIn) })
wg.Wait()
p.Cleanup()
})
}

Expand Down
5 changes: 5 additions & 0 deletions component/loki/process/stages/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,8 @@ func (m *samplingStage) randomNumber() uint64 {
func (m *samplingStage) Name() string {
return StageTypeSampling
}

// Cleanup implements Stage.
func (*samplingStage) Cleanup() {
// no-op
}
6 changes: 6 additions & 0 deletions component/loki/process/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Entry struct {
type Stage interface {
Name() string
Run(chan Entry) chan Entry
Cleanup()
}

func (entry *Entry) copy() *Entry {
Expand Down Expand Up @@ -237,3 +238,8 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh
}
return s, nil
}

// Cleanup implements Stage.
func (*stageProcessor) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions component/loki/process/stages/structured_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func (s *structuredMetadataStage) Name() string {
return StageTypeStructuredMetadata
}

// Cleanup implements Stage.
func (*structuredMetadataStage) Cleanup() {
// no-op
}

func (s *structuredMetadataStage) Run(in chan Entry) chan Entry {
return RunWith(in, func(e Entry) Entry {
processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *PushAPIServer) getRelabelRules() []*relabel.Config {
func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(logger, userID, r, nil)
req, err := push.ParseRequest(logger, userID, r, nil, nil, push.ParseLokiRequest)
if err != nil {
level.Warn(s.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
25 changes: 19 additions & 6 deletions component/prometheus/exporter/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func init() {

func createExporter(opts component.Options, args component.Arguments, defaultInstanceKey string) (integrations.Integration, string, error) {
a := args.(Arguments)
return integrations.NewIntegrationWithInstanceKey(opts.Logger, a.Convert(), defaultInstanceKey)
return integrations.NewIntegrationWithInstanceKey(opts.Logger, a.convert(opts.ID), defaultInstanceKey)
}

func parsePostgresURL(url string) (map[string]string, error) {
Expand Down Expand Up @@ -78,15 +78,26 @@ type Arguments struct {
DataSourceNames []rivertypes.Secret `river:"data_source_names,attr,optional"`

// Attributes
DisableSettingsMetrics bool `river:"disable_settings_metrics,attr,optional"`
DisableDefaultMetrics bool `river:"disable_default_metrics,attr,optional"`
CustomQueriesConfigPath string `river:"custom_queries_config_path,attr,optional"`
DisableSettingsMetrics bool `river:"disable_settings_metrics,attr,optional"`
DisableDefaultMetrics bool `river:"disable_default_metrics,attr,optional"`
CustomQueriesConfigPath string `river:"custom_queries_config_path,attr,optional"`
EnabledCollectors []string `river:"enabled_collectors,attr,optional"`

// Blocks
AutoDiscovery AutoDiscovery `river:"autodiscovery,block,optional"`
}

// Autodiscovery controls discovery of databases outside any specified in DataSourceNames.
func (a *Arguments) Validate() error {
if a.DisableDefaultMetrics && a.CustomQueriesConfigPath == "" {
return fmt.Errorf("custom_queries_config_path must be set when disable_default_metrics is true")
}
if a.DisableDefaultMetrics && len(a.EnabledCollectors) != 0 {
return fmt.Errorf("enabled_collectors cannot be set when disable_default_metrics is true")
}
return nil
}

// AutoDiscovery controls discovery of databases outside any specified in DataSourceNames.
type AutoDiscovery struct {
Enabled bool `river:"enabled,attr,optional"`
DatabaseAllowlist []string `river:"database_allowlist,attr,optional"`
Expand All @@ -98,7 +109,7 @@ func (a *Arguments) SetToDefault() {
*a = DefaultArguments
}

func (a *Arguments) Convert() *postgres_exporter.Config {
func (a *Arguments) convert(instanceName string) *postgres_exporter.Config {
return &postgres_exporter.Config{
DataSourceNames: a.convertDataSourceNames(),
DisableSettingsMetrics: a.DisableSettingsMetrics,
Expand All @@ -107,6 +118,8 @@ func (a *Arguments) Convert() *postgres_exporter.Config {
IncludeDatabases: a.AutoDiscovery.DatabaseAllowlist,
DisableDefaultMetrics: a.DisableDefaultMetrics,
QueryPath: a.CustomQueriesConfigPath,
Instance: instanceName,
EnabledCollectors: a.EnabledCollectors,
}
}

Expand Down
Loading

0 comments on commit 4a1c26f

Please sign in to comment.