Skip to content

Commit

Permalink
Clean up metrics generated from logs after a config reload.
Browse files Browse the repository at this point in the history
Synced from:
grafana/loki#12938
  • Loading branch information
ptodev committed May 15, 2024
1 parent 295e9c9 commit 513fce4
Show file tree
Hide file tree
Showing 17 changed files with 116 additions and 2 deletions.
6 changes: 6 additions & 0 deletions internal/component/loki/process/metric/metricvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool {
return ok
}

func (c *metricVec) DeleteAll() {
c.mtx.Lock()
defer c.mtx.Unlock()
c.metrics = map[model.Fingerprint]prometheus.Metric{}
}

// prune will remove all metrics which implement the Expirable interface and have expired
// it does not take out a lock on the metrics map so whoever calls this function should do so.
func (c *metricVec) prune() {
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/decolorize.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry {
func (m *decolorizeStage) Name() string {
return StageTypeDecolorize
}

// Cleanup implements Stage.
func (*decolorizeStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,8 @@ func splitSource(s string) []string {
func (m *dropStage) Name() string {
return StageTypeDrop
}

// Cleanup implements Stage.
func (*dropStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/eventlogmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func (m *eventLogMessageStage) Name() string {
return StageTypeEventLogMessage
}

// Cleanup implements Stage.
func (*eventLogMessageStage) Cleanup() {
// no-op
}

// Sanitize a input string to convert it into a valid prometheus label
// TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName
func SanitizeFullLabelName(input string) string {
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func (c *cri) Name() string {
return StageTypeCRI
}

// Cleanup implements Stage.
func (*cri) Cleanup() {
// no-op
}

// implements Stage interface
func (c *cri) Run(entry chan Entry) chan Entry {
entry = c.base.Run(entry)
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/geoip.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ func (g *geoIPStage) Name() string {
return StageTypeGeoIP
}

// Cleanup implements Stage.
func (*geoIPStage) Cleanup() {
// no-op
}

func (g *geoIPStage) process(_ model.LabelSet, extracted map[string]interface{}) {
var ip net.IP
if g.cfgs.Source != nil {
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string
func (j *jsonStage) Name() string {
return StageTypeJSON
}

// Cleanup implements Stage.
func (*jsonStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func (m *limitStage) Name() string {
return StageTypeLimit
}

// Cleanup implements Stage.
func (*limitStage) Cleanup() {
// no-op
}

func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec {
return registerCounterVec(registerer, "loki_process", "dropped_lines_by_label_total",
"A count of all log lines dropped as a result of a pipeline stage",
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) {
func (m *matcherStage) Name() string {
return StageTypeMatch
}

// Cleanup implements Stage.
func (*matcherStage) Cleanup() {
// no-op
}
31 changes: 29 additions & 2 deletions internal/component/loki/process/stages/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func newMetricStage(logger log.Logger, config MetricsConfig, registry prometheus
return nil, fmt.Errorf("undefined stage type in '%v', exiting", cfg)
}
}
return toStage(&metricStage{
return &metricStage{
logger: logger,
cfg: config,
metrics: metrics,
}), nil
}, nil
}

// metricStage creates and updates prometheus metrics based on extracted pipeline data
Expand All @@ -118,6 +118,19 @@ type metricStage struct {
metrics map[string]cfgCollector
}

func (m *metricStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)

for e := range in {
m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line)
out <- e
}
}()
return out
}

// Process implements Stage
func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for name, cc := range m.metrics {
Expand Down Expand Up @@ -162,6 +175,20 @@ func (m *metricStage) Name() string {
return StageTypeMetric
}

// Cleanup implements Stage.
func (m *metricStage) Cleanup() {
for _, cfgCollector := range m.metrics {
switch vec := cfgCollector.collector.(type) {
case *metric.Counters:
vec.DeleteAll()
case *metric.Gauges:
vec.DeleteAll()
case *metric.Histograms:
vec.DeleteAll()
}
}
}

// recordCounter will update a counter metric
func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) {
// If value matching is defined, make sure value matches.
Expand Down
7 changes: 7 additions & 0 deletions internal/component/loki/process/stages/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ func TestMetricsPipeline(t *testing.T) {
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

pl.Cleanup()

if err := testutil.GatherAndCompare(registry,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}
}

func TestNegativeGauge(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) {
func (m *multilineStage) Name() string {
return StageTypeMultiline
}

// Cleanup implements Stage.
func (*multilineStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,8 @@ func (m *packStage) pack(e Entry) Entry {
func (m *packStage) Name() string {
return StageTypePack
}

// Cleanup implements Stage.
func (*packStage) Cleanup() {
// no-op
}
8 changes: 8 additions & 0 deletions internal/component/loki/process/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ func (p *Pipeline) Name() string {
return StageTypePipeline
}

// Cleanup implements Stage.
func (p *Pipeline) Cleanup() {
for _, s := range p.stages {
s.Cleanup()
}
}

// Wrap implements EntryMiddleware
func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler {
handlerIn := make(chan loki.Entry)
Expand Down Expand Up @@ -181,6 +188,7 @@ func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler {
return loki.NewEntryHandler(handlerIn, func() {
once.Do(func() { close(handlerIn) })
wg.Wait()
p.Cleanup()
})
}

Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,8 @@ func (m *samplingStage) randomNumber() uint64 {
func (m *samplingStage) Name() string {
return StageTypeSampling
}

// Cleanup implements Stage.
func (*samplingStage) Cleanup() {
// no-op
}
6 changes: 6 additions & 0 deletions internal/component/loki/process/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Entry struct {
type Stage interface {
Name() string
Run(chan Entry) chan Entry
Cleanup()
}

func (entry *Entry) copy() *Entry {
Expand Down Expand Up @@ -243,3 +244,8 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh
}
return s, nil
}

// Cleanup implements Stage.
func (*stageProcessor) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/structured_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func (s *structuredMetadataStage) Name() string {
return StageTypeStructuredMetadata
}

// Cleanup implements Stage.
func (*structuredMetadataStage) Cleanup() {
// no-op
}

func (s *structuredMetadataStage) Run(in chan Entry) chan Entry {
return RunWith(in, func(e Entry) Entry {
processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {
Expand Down

0 comments on commit 513fce4

Please sign in to comment.