Skip to content

Commit

Permalink
Add code to check for staleness in labelstore (#5970)
Browse files Browse the repository at this point in the history
* Add code to check for staleness which touched a ton of tests since I also added metrics.

* Update metric

* Add explicit _ for register

* pr feedback on changing to gauges

* updated comment with test results
  • Loading branch information
mattdurham authored Dec 15, 2023
1 parent bb1d64d commit fd12c48
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 47 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ Main (unreleased)

- Fixes `otelcol.connector.servicegraph` store ttl default value from 2ms to 2s. (@rlankfo)

- Add staleness tracking to labelstore to reduce memory usage. (@mattdurham)

### Other changes

- Bump github.com/IBM/sarama from v1.41.2 to v1.42.1
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (fr *flowRun) Run(configPath string) error {
return fmt.Errorf("failed to create otel service")
}

labelService := labelstore.New(l)
labelService := labelstore.New(l, reg)

f := flow.New(flow.Options{
Logger: l,
Expand Down
12 changes: 8 additions & 4 deletions component/prometheus/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@ import (
"time"

"github.com/grafana/agent/service/labelstore"
"github.com/prometheus/client_golang/prometheus"

"github.com/hashicorp/go-multierror"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/scrape"

"github.com/prometheus/prometheus/storage"
)

Expand Down Expand Up @@ -112,6 +110,12 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
if ref == 0 {
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}
if value.IsStaleNaN(v) {
a.ls.AddStaleMarker(uint64(ref), l)
} else {
// Tested this to ensure it had no cpu impact, since it is called so often.
a.ls.RemoveStaleMarker(uint64(ref))
}
var multiErr error
updated := false
for _, x := range a.children {
Expand Down
4 changes: 2 additions & 2 deletions component/prometheus/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ import (
)

func TestRollback(t *testing.T) {
ls := labelstore.New(nil)
ls := labelstore.New(nil, prometheus.DefaultRegisterer)
fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls)
app := fanout.Appender(context.Background())
err := app.Rollback()
require.NoError(t, err)
}

func TestCommit(t *testing.T) {
ls := labelstore.New(nil)
ls := labelstore.New(nil, prometheus.DefaultRegisterer)
fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls)
app := fanout.Appender(context.Background())
err := app.Commit()
Expand Down
8 changes: 8 additions & 0 deletions component/prometheus/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
)

Expand Down Expand Up @@ -102,6 +103,13 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
}

if value.IsStaleNaN(v) {
a.ls.AddStaleMarker(uint64(ref), l)
} else {
// Tested this to ensure it had no cpu impact, since it is called so often.
a.ls.RemoveStaleMarker(uint64(ref))
}

if a.interceptor.onAppend != nil {
return a.interceptor.onAppend(ref, l, t, v, a.child)
}
Expand Down
5 changes: 3 additions & 2 deletions component/prometheus/operator/common/crdmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/agent/component/prometheus/operator"
"github.com/grafana/agent/service/cluster"
"github.com/grafana/agent/service/labelstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
Expand All @@ -33,7 +34,7 @@ func TestClearConfigsSameNsSamePrefix(t *testing.T) {
logger,
&operator.DefaultArguments,
KindServiceMonitor,
labelstore.New(logger),
labelstore.New(logger, prometheus.DefaultRegisterer),
)

m.discoveryManager = newMockDiscoveryManager()
Expand Down Expand Up @@ -98,7 +99,7 @@ func TestClearConfigsProbe(t *testing.T) {
logger,
&operator.DefaultArguments,
KindProbe,
labelstore.New(logger),
labelstore.New(logger, prometheus.DefaultRegisterer),
)

m.discoveryManager = newMockDiscoveryManager()
Expand Down
4 changes: 2 additions & 2 deletions component/prometheus/receive_http/receive_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func testAppendable(actualSamples chan testSample) []storage.Appendable {
return ref, nil
}

ls := labelstore.New(nil)
ls := labelstore.New(nil, prometheus.DefaultRegisterer)
return []storage.Appendable{agentprom.NewInterceptor(
nil,
ls,
Expand Down Expand Up @@ -385,7 +385,7 @@ func testOptions(t *testing.T) component.Options {
Logger: util.TestFlowLogger(t),
Registerer: prometheus.NewRegistry(),
GetServiceData: func(name string) (interface{}, error) {
return labelstore.New(nil), nil
return labelstore.New(nil, prometheus.DefaultRegisterer), nil
},
}
}
Expand Down
12 changes: 6 additions & 6 deletions component/prometheus/relabel/relabel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

func TestCache(t *testing.T) {
lc := labelstore.New(nil)
lc := labelstore.New(nil, prom.DefaultRegisterer)
relabeller := generateRelabel(t)
lbls := labels.FromStrings("__address__", "localhost")
relabeller.relabel(0, lbls)
Expand All @@ -50,7 +50,7 @@ func TestUpdateReset(t *testing.T) {
}

func TestNil(t *testing.T) {
ls := labelstore.New(nil)
ls := labelstore.New(nil, prom.DefaultRegisterer)
fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
require.True(t, false)
return ref, nil
Expand All @@ -61,7 +61,7 @@ func TestNil(t *testing.T) {
OnStateChange: func(e component.Exports) {},
Registerer: prom.NewRegistry(),
GetServiceData: func(name string) (interface{}, error) {
return labelstore.New(nil), nil
return labelstore.New(nil, prom.DefaultRegisterer), nil
},
}, Arguments{
ForwardTo: []storage.Appendable{fanout},
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestLRUNaN(t *testing.T) {
}

func BenchmarkCache(b *testing.B) {
ls := labelstore.New(nil)
ls := labelstore.New(nil, prom.DefaultRegisterer)
fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
require.True(b, l.Has("new_label"))
return ref, nil
Expand Down Expand Up @@ -137,7 +137,7 @@ func BenchmarkCache(b *testing.B) {
}

func generateRelabel(t *testing.T) *Component {
ls := labelstore.New(nil)
ls := labelstore.New(nil, prom.DefaultRegisterer)
fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
require.True(t, l.Has("new_label"))
return ref, nil
Expand All @@ -148,7 +148,7 @@ func generateRelabel(t *testing.T) *Component {
OnStateChange: func(e component.Exports) {},
Registerer: prom.NewRegistry(),
GetServiceData: func(name string) (interface{}, error) {
return labelstore.New(nil), nil
return labelstore.New(nil, prom.DefaultRegisterer), nil
},
}, Arguments{
ForwardTo: []storage.Appendable{fanout},
Expand Down
6 changes: 3 additions & 3 deletions component/prometheus/scrape/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestForwardingToAppendable(t *testing.T) {
case cluster.ServiceName:
return cluster.Mock(), nil
case labelstore.ServiceName:
return labelstore.New(nil), nil
return labelstore.New(nil, prometheus_client.DefaultRegisterer), nil
default:
return nil, fmt.Errorf("service %q does not exist", name)
}
Expand All @@ -114,7 +114,7 @@ func TestForwardingToAppendable(t *testing.T) {
// Update the component with a mock receiver; it should be passed along to the Appendable.
var receivedTs int64
var receivedSamples labels.Labels
ls := labelstore.New(nil)
ls := labelstore.New(nil, prometheus_client.DefaultRegisterer)
fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, t int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
receivedTs = t
receivedSamples = l
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestCustomDialer(t *testing.T) {
case cluster.ServiceName:
return cluster.Mock(), nil
case labelstore.ServiceName:
return labelstore.New(nil), nil
return labelstore.New(nil, prometheus_client.DefaultRegisterer), nil

default:
return nil, fmt.Errorf("service %q does not exist", name)
Expand Down
3 changes: 2 additions & 1 deletion converter/internal/test_common/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
cluster_service "github.com/grafana/agent/service/cluster"
http_service "github.com/grafana/agent/service/http"
"github.com/grafana/agent/service/labelstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -194,7 +195,7 @@ func attemptLoadingFlowConfig(t *testing.T, river []byte) {
// properly.
http_service.New(http_service.Options{}),
clusterService,
labelstore.New(nil),
labelstore.New(nil, prometheus.DefaultRegisterer),
},
})
err = f.LoadSource(cfg, nil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/flow/componenttest/componenttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (c *Controller) buildComponent(dataPath string, args component.Arguments) (
GetServiceData: func(name string) (interface{}, error) {
switch name {
case labelstore.ServiceName:
return labelstore.New(nil), nil
return labelstore.New(nil, prometheus.DefaultRegisterer), nil
default:
return nil, fmt.Errorf("no service named %s defined", name)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/flow/module_caching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
http_service "github.com/grafana/agent/service/http"
"github.com/grafana/agent/service/labelstore"
otel_service "github.com/grafana/agent/service/otel"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

Expand Down Expand Up @@ -164,7 +165,7 @@ func testOptions(t *testing.T) flow.Options {
http_service.New(http_service.Options{}),
clusterService,
otelService,
labelstore.New(nil),
labelstore.New(nil, prometheus.DefaultRegisterer),
},
}
}
Expand Down
73 changes: 56 additions & 17 deletions service/labelstore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,26 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/agent/pkg/flow/logging/level"
agent_service "github.com/grafana/agent/service"
flow_service "github.com/grafana/agent/service"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
)

const ServiceName = "labelstore"

type service struct {
log log.Logger
mut sync.Mutex
globalRefID uint64
mappings map[string]*remoteWriteMapping
labelsHashToGlobal map[uint64]uint64
staleGlobals map[uint64]*staleMarker
log log.Logger
mut sync.Mutex
globalRefID uint64
mappings map[string]*remoteWriteMapping
labelsHashToGlobal map[uint64]uint64
staleGlobals map[uint64]*staleMarker
totalIDs *prometheus.Desc
idsInRemoteWrapping *prometheus.Desc
lastStaleCheck prometheus.Gauge
}

type staleMarker struct {
globalID uint64
lastMarkedStale time.Time
Expand All @@ -32,17 +36,26 @@ type Arguments struct{}

var _ flow_service.Service = (*service)(nil)

func New(l log.Logger) *service {
func New(l log.Logger, r prometheus.Registerer) *service {
if l == nil {
l = log.NewNopLogger()
}
return &service{
log: l,
globalRefID: 0,
mappings: make(map[string]*remoteWriteMapping),
labelsHashToGlobal: make(map[uint64]uint64),
staleGlobals: make(map[uint64]*staleMarker),
s := &service{
log: l,
globalRefID: 0,
mappings: make(map[string]*remoteWriteMapping),
labelsHashToGlobal: make(map[uint64]uint64),
staleGlobals: make(map[uint64]*staleMarker),
totalIDs: prometheus.NewDesc("agent_labelstore_global_ids_count", "Total number of global ids.", nil, nil),
idsInRemoteWrapping: prometheus.NewDesc("agent_labelstore_remote_store_ids_count", "Total number of ids per remote write", []string{"remote_name"}, nil),
lastStaleCheck: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "agent_labelstore_last_stale_check_timestamp",
Help: "Last time stale check was ran expressed in unix timestamp.",
}),
}
_ = r.Register(s.lastStaleCheck)
_ = r.Register(s)
return s
}

// Definition returns the Definition of the Service.
Expand All @@ -56,12 +69,33 @@ func (s *service) Definition() agent_service.Definition {
}
}

func (s *service) Describe(m chan<- *prometheus.Desc) {
m <- s.totalIDs
m <- s.idsInRemoteWrapping
}
func (s *service) Collect(m chan<- prometheus.Metric) {
s.mut.Lock()
defer s.mut.Unlock()

m <- prometheus.MustNewConstMetric(s.totalIDs, prometheus.GaugeValue, float64(len(s.labelsHashToGlobal)))
for name, rw := range s.mappings {
m <- prometheus.MustNewConstMetric(s.idsInRemoteWrapping, prometheus.GaugeValue, float64(len(rw.globalToLocal)), name)
}
}

// Run starts a Service. Run must block until the provided
// context is canceled. Returning an error should be treated
// as a fatal error for the Service.
func (s *service) Run(ctx context.Context, host agent_service.Host) error {
<-ctx.Done()
return nil
staleCheck := time.NewTicker(10 * time.Minute)
for {
select {
case <-ctx.Done():
return nil
case <-staleCheck.C:
s.CheckAndRemoveStaleMarkers()
}
}
}

// Update updates a Service at runtime. Update is never
Expand All @@ -72,7 +106,7 @@ func (s *service) Run(ctx context.Context, host agent_service.Host) error {
//
// Update will be called once before Run, and may be called
// while Run is active.
func (s *service) Update(newConfig any) error {
func (s *service) Update(_ any) error {
return nil
}

Expand Down Expand Up @@ -190,6 +224,8 @@ func (s *service) CheckAndRemoveStaleMarkers() {
s.mut.Lock()
defer s.mut.Unlock()

s.lastStaleCheck.Set(float64(time.Now().Unix()))
level.Debug(s.log).Log("msg", "labelstore removing stale markers")
curr := time.Now()
idsToBeGCed := make([]*staleMarker, 0)
for _, stale := range s.staleGlobals {
Expand All @@ -199,6 +235,9 @@ func (s *service) CheckAndRemoveStaleMarkers() {
}
idsToBeGCed = append(idsToBeGCed, stale)
}

level.Debug(s.log).Log("msg", "number of ids to remove", "count", len(idsToBeGCed))

for _, marker := range idsToBeGCed {
delete(s.staleGlobals, marker.globalID)
delete(s.labelsHashToGlobal, marker.labelHash)
Expand Down
Loading

0 comments on commit fd12c48

Please sign in to comment.