diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index b517957833a9..6481f31c8e9c 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -131,7 +131,7 @@ func New( } // Configure ObjectClient and IndexShipper for series and chunk management - objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, storageCfg, clientMetrics) + objectClient, err := storage.NewObjectClient("bloom-compactor", periodicConfig.ObjectType, storageCfg, clientMetrics, r) if err != nil { return nil, fmt.Errorf("error creating object client '%s': %w", periodicConfig.ObjectType, err) } diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index 6a71f0979abc..e7c0a0276909 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -497,10 +497,11 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string func GetObjectClient(store string, conf loki.Config, cm storage.ClientMetrics) (chunk.ObjectClient, error) { oc, err := storage.NewObjectClient( + "log-cli-query", store, conf.StorageConfig, cm, - ) + prometheus.DefaultRegisterer) if err != nil { return nil, err } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 9bc969dc7745..8424c6add2f6 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -607,9 +607,7 @@ func (t *Loki) initTableManager() (services.Service, error) { os.Exit(1) } - reg := prometheus.WrapRegistererWith(prometheus.Labels{"component": "table-manager-store"}, prometheus.DefaultRegisterer) - - tableClient, err := storage.NewTableClient(lastConfig.IndexType, *lastConfig, t.Cfg.StorageConfig, t.clientMetrics, reg, util_log.Logger) + tableClient, err := storage.NewTableClient("table-manager-store", lastConfig.IndexType, *lastConfig, t.Cfg.StorageConfig, t.clientMetrics, prometheus.DefaultRegisterer, util_log.Logger) if err != nil { return nil, err } @@ -1215,7 +1213,7 @@ func (t *Loki) initCompactor() (services.Service, error) { continue } - objectClient, err := storage.NewObjectClient(periodConfig.ObjectType, t.Cfg.StorageConfig, t.clientMetrics) + objectClient, err := storage.NewObjectClient("compactor", periodConfig.ObjectType, t.Cfg.StorageConfig, t.clientMetrics, prometheus.DefaultRegisterer) if err != nil { return nil, fmt.Errorf("failed to create object client: %w", err) } @@ -1226,7 +1224,7 @@ func (t *Loki) initCompactor() (services.Service, error) { var deleteRequestStoreClient client.ObjectClient if t.Cfg.CompactorConfig.RetentionEnabled { if deleteStore := t.Cfg.CompactorConfig.DeleteRequestStore; deleteStore != "" { - if deleteRequestStoreClient, err = storage.NewObjectClient(deleteStore, t.Cfg.StorageConfig, t.clientMetrics); err != nil { + if deleteRequestStoreClient, err = storage.NewObjectClient("compactor", deleteStore, t.Cfg.StorageConfig, t.clientMetrics, prometheus.DefaultRegisterer); err != nil { return nil, fmt.Errorf("failed to create delete request store object client: %w", err) } } else { @@ -1319,7 +1317,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) { } tableRange := period.GetIndexTableNumberRange(periodEndTime) - indexClient, err := storage.NewIndexClient(period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.clientMetrics, shardingStrategy, + indexClient, err := storage.NewIndexClient("index-gateway", period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.clientMetrics, shardingStrategy, prometheus.DefaultRegisterer, log.With(util_log.Logger, "index-store", fmt.Sprintf("%s-%s", period.IndexType, period.From.String())), t.Cfg.MetricsNamespace, ) if err != nil { @@ -1519,7 +1517,7 @@ func (t *Loki) initAnalytics() (services.Service, error) { return nil, err } - objectClient, err := storage.NewObjectClient(period.ObjectType, t.Cfg.StorageConfig, t.clientMetrics) + objectClient, err := storage.NewObjectClient("analytics", period.ObjectType, t.Cfg.StorageConfig, t.clientMetrics, prometheus.DefaultRegisterer) if err != nil { level.Info(util_log.Logger).Log("msg", "failed to initialize usage report", "err", err) return nil, nil diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 5f19b3954879..6490baf76497 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -190,8 +190,11 @@ func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus return bucketClient } + reg = prometheus.WrapRegistererWithPrefix("loki_", reg) + reg = prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg) + return objstore.WrapWithMetrics( bucketClient, - prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg), + reg, "") } diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 538ab9bc2c25..6237b07cd9ea 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -413,7 +413,7 @@ func (cfg *Config) Validate() error { } // NewIndexClient creates a new index client of the desired type specified in the PeriodConfig -func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, shardingStrategy indexgateway.ShardingStrategy, registerer prometheus.Registerer, logger log.Logger, metricsNamespace string) (index.Client, error) { +func NewIndexClient(component string, periodCfg config.PeriodConfig, tableRange config.TableRange, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, shardingStrategy indexgateway.ShardingStrategy, registerer prometheus.Registerer, logger log.Logger, metricsNamespace string) (index.Client, error) { switch true { case util.StringsContain(testingStorageTypes, periodCfg.IndexType): @@ -444,7 +444,7 @@ func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange, return client, nil } - objectClient, err := NewObjectClient(periodCfg.ObjectType, cfg, cm) + objectClient, err := NewObjectClient(component, periodCfg.ObjectType, cfg, cm, registerer) if err != nil { return nil, err } @@ -505,7 +505,7 @@ func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange, } // NewChunkClient makes a new chunk.Client of the desired types. -func NewChunkClient(name string, cfg Config, schemaCfg config.SchemaConfig, cc congestion.Controller, registerer prometheus.Registerer, clientMetrics ClientMetrics, logger log.Logger) (client.Client, error) { +func NewChunkClient(component, name string, cfg Config, schemaCfg config.SchemaConfig, cc congestion.Controller, registerer prometheus.Registerer, clientMetrics ClientMetrics, logger log.Logger) (client.Client, error) { var storeType = name // lookup storeType for named stores @@ -518,7 +518,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg config.SchemaConfig, cc c case util.StringsContain(testingStorageTypes, storeType): switch storeType { case config.StorageTypeInMemory: - c, err := NewObjectClient(name, cfg, clientMetrics) + c, err := NewObjectClient(component, name, cfg, clientMetrics, registerer) if err != nil { return nil, err } @@ -528,21 +528,21 @@ func NewChunkClient(name string, cfg Config, schemaCfg config.SchemaConfig, cc c case util.StringsContain(supportedStorageTypes, storeType): switch storeType { case config.StorageTypeFileSystem: - c, err := NewObjectClient(name, cfg, clientMetrics) + c, err := NewObjectClient(component, name, cfg, clientMetrics, registerer) if err != nil { return nil, err } return client.NewClientWithMaxParallel(c, client.FSEncoder, cfg.MaxParallelGetChunk, schemaCfg), nil case config.StorageTypeAWS, config.StorageTypeS3, config.StorageTypeAzure, config.StorageTypeBOS, config.StorageTypeSwift, config.StorageTypeCOS, config.StorageTypeAlibabaCloud: - c, err := NewObjectClient(name, cfg, clientMetrics) + c, err := NewObjectClient(component, name, cfg, clientMetrics, registerer) if err != nil { return nil, err } return client.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk, schemaCfg), nil case config.StorageTypeGCS: - c, err := NewObjectClient(name, cfg, clientMetrics) + c, err := NewObjectClient(component, name, cfg, clientMetrics, registerer) if err != nil { return nil, err } @@ -583,7 +583,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg config.SchemaConfig, cc c } // NewTableClient makes a new table client based on the configuration. -func NewTableClient(name string, periodCfg config.PeriodConfig, cfg Config, cm ClientMetrics, registerer prometheus.Registerer, logger log.Logger) (index.TableClient, error) { +func NewTableClient(component, name string, periodCfg config.PeriodConfig, cfg Config, cm ClientMetrics, registerer prometheus.Registerer, logger log.Logger) (index.TableClient, error) { switch true { case util.StringsContain(testingStorageTypes, name): switch name { @@ -592,7 +592,7 @@ func NewTableClient(name string, periodCfg config.PeriodConfig, cfg Config, cm C } case util.StringsContain(supportedIndexTypes, name): - objectClient, err := NewObjectClient(periodCfg.ObjectType, cfg, cm) + objectClient, err := NewObjectClient(component, periodCfg.ObjectType, cfg, cm, registerer) if err != nil { return nil, err } @@ -647,8 +647,8 @@ func (c *ClientMetrics) Unregister() { } // NewObjectClient makes a new StorageClient with the prefix in the front. -func NewObjectClient(name string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) { - actual, err := internalNewObjectClient(name, cfg, clientMetrics) +func NewObjectClient(component, name string, cfg Config, clientMetrics ClientMetrics, reg prometheus.Registerer) (client.ObjectClient, error) { + actual, err := internalNewObjectClient(component, name, cfg, clientMetrics, reg) if err != nil { return nil, err } @@ -662,12 +662,17 @@ func NewObjectClient(name string, cfg Config, clientMetrics ClientMetrics) (clie } // internalNewObjectClient makes the underlying StorageClient of the desired types. -func internalNewObjectClient(name string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) { +func internalNewObjectClient(component, name string, cfg Config, clientMetrics ClientMetrics, reg prometheus.Registerer) (client.ObjectClient, error) { var ( namedStore string storeType = name ) + // preserve olf reg behaviour + if !cfg.ThanosObjStore { + reg = prometheus.WrapRegistererWith(prometheus.Labels{"component": component}, reg) + } + // lookup storeType for named stores if nsType, ok := cfg.NamedStores.storeType[name]; ok { storeType = nsType @@ -717,10 +722,7 @@ func internalNewObjectClient(name string, cfg Config, clientMetrics ClientMetric gcsCfg.EnableRetries = false } if cfg.ThanosObjStore { - // Passing "gcs" as the component name as currently it's not - // possible to get the component called this method - // TODO(JoaoBraveCoding) update compoent when bigger refactor happens - return gcp.NewGCSThanosObjectClient(context.Background(), cfg.ObjStoreConf, "gcs", utilLog.Logger, cfg.Hedging, prometheus.WrapRegistererWithPrefix("loki_", prometheus.NewRegistry())) + return gcp.NewGCSThanosObjectClient(context.Background(), cfg.ObjStoreConf, component, utilLog.Logger, cfg.Hedging, reg) } return gcp.NewGCSObjectClient(context.Background(), gcsCfg, cfg.Hedging) diff --git a/pkg/storage/factory_test.go b/pkg/storage/factory_test.go index 2588c9dc69dd..242f99abc9bf 100644 --- a/pkg/storage/factory_test.go +++ b/pkg/storage/factory_test.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -233,7 +234,7 @@ func TestNewObjectClient_prefixing(t *testing.T) { var cfg Config flagext.DefaultValues(&cfg) - objectClient, err := NewObjectClient("inmemory", cfg, cm) + objectClient, err := NewObjectClient("inmemory", "inmemory", cfg, cm, prometheus.NewRegistry()) require.NoError(t, err) _, ok := objectClient.(client.PrefixedObjectClient) @@ -245,7 +246,7 @@ func TestNewObjectClient_prefixing(t *testing.T) { flagext.DefaultValues(&cfg) cfg.ObjectPrefix = "my/prefix/" - objectClient, err := NewObjectClient("inmemory", cfg, cm) + objectClient, err := NewObjectClient("inmemory", "inmemory", cfg, cm, prometheus.NewRegistry()) require.NoError(t, err) prefixed, ok := objectClient.(client.PrefixedObjectClient) @@ -258,7 +259,7 @@ func TestNewObjectClient_prefixing(t *testing.T) { flagext.DefaultValues(&cfg) cfg.ObjectPrefix = "my/prefix" - objectClient, err := NewObjectClient("inmemory", cfg, cm) + objectClient, err := NewObjectClient("inmemory", "inmemory", cfg, cm, prometheus.NewRegistry()) require.NoError(t, err) prefixed, ok := objectClient.(client.PrefixedObjectClient) @@ -271,7 +272,7 @@ func TestNewObjectClient_prefixing(t *testing.T) { flagext.DefaultValues(&cfg) cfg.ObjectPrefix = "/my/prefix/" - objectClient, err := NewObjectClient("inmemory", cfg, cm) + objectClient, err := NewObjectClient("inmemory", "inmemory", cfg, cm, prometheus.NewRegistry()) require.NoError(t, err) prefixed, ok := objectClient.(client.PrefixedObjectClient) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 6781dbbff8a3..a71af3dc3d78 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -216,9 +216,6 @@ func (s *LokiStore) chunkClientForPeriod(p config.PeriodConfig) (client.Client, if objectStoreType == "" { objectStoreType = p.IndexType } - chunkClientReg := prometheus.WrapRegistererWith( - prometheus.Labels{"component": "chunk-store-" + p.From.String()}, s.registerer) - var cc congestion.Controller ccCfg := s.cfg.CongestionControl @@ -230,7 +227,8 @@ func (s *LokiStore) chunkClientForPeriod(p config.PeriodConfig) (client.Client, ) } - chunks, err := NewChunkClient(objectStoreType, s.cfg, s.schemaCfg, cc, chunkClientReg, s.clientMetrics, s.logger) + component := "chunk-store-" + p.From.String() + chunks, err := NewChunkClient(component, objectStoreType, s.cfg, s.schemaCfg, cc, s.registerer, s.clientMetrics, s.logger) if err != nil { return nil, errors.Wrap(err, "error creating object client") } @@ -253,14 +251,8 @@ func shouldUseIndexGatewayClient(cfg indexshipper.Config) bool { } func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.TableRange, chunkClient client.Client, f *fetcher.Fetcher) (stores.ChunkWriter, index.ReaderWriter, func(), error) { - indexClientReg := prometheus.WrapRegistererWith( - prometheus.Labels{ - "component": fmt.Sprintf( - "index-store-%s-%s", - p.IndexType, - p.From.String(), - ), - }, s.registerer) + component := fmt.Sprintf("index-store-%s-%s", p.IndexType, p.From.String()) + indexClientReg := prometheus.WrapRegistererWith(prometheus.Labels{"component": component}, s.registerer) indexClientLogger := log.With(s.logger, "index-store", fmt.Sprintf("%s-%s", p.IndexType, p.From.String())) if p.IndexType == config.TSDBType { @@ -278,7 +270,7 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl }, nil } - objectClient, err := NewObjectClient(p.ObjectType, s.cfg, s.clientMetrics) + objectClient, err := NewObjectClient(component, p.ObjectType, s.cfg, s.clientMetrics, s.registerer) if err != nil { return nil, nil, nil, err } @@ -301,7 +293,7 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl }, nil } - idx, err := NewIndexClient(p, tableRange, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, nil, indexClientReg, indexClientLogger, s.metricsNamespace) + idx, err := NewIndexClient(component, p, tableRange, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, nil, s.registerer, indexClientLogger, s.metricsNamespace) if err != nil { return nil, nil, nil, errors.Wrap(err, "error creating index client") } diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index a68959e1d908..cc4d282b48ef 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/grafana/dskit/concurrency" @@ -94,7 +95,7 @@ type Client interface { func NewBloomClient(periodicConfigs []config.PeriodConfig, storageConfig storage.Config, clientMetrics storage.ClientMetrics) (*BloomClient, error) { periodicObjectClients := make(map[config.DayTime]client.ObjectClient) for _, periodicConfig := range periodicConfigs { - objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, storageConfig, clientMetrics) + objectClient, err := storage.NewObjectClient("bloom-shipper", periodicConfig.ObjectType, storageConfig, clientMetrics, prometheus.DefaultRegisterer) if err != nil { return nil, fmt.Errorf("error creating object client '%s': %w", periodicConfig.ObjectType, err) } diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util_test.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util_test.go index 12dbfb8d9a7d..a716f5522041 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util_test.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util_test.go @@ -10,6 +10,7 @@ import ( ww "github.com/grafana/dskit/server" "github.com/grafana/dskit/user" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" @@ -128,11 +129,11 @@ type testObjectClient struct { } func newTestObjectClient(path string, clientMetrics storage.ClientMetrics) client.ObjectClient { - c, err := storage.NewObjectClient("filesystem", storage.Config{ + c, err := storage.NewObjectClient("compactor", "filesystem", storage.Config{ FSConfig: local.FSConfig{ Directory: path, }, - }, clientMetrics) + }, clientMetrics, prometheus.NewRegistry()) if err != nil { panic(err) } diff --git a/tools/tsdb/bloom-tester/lib.go b/tools/tsdb/bloom-tester/lib.go index 7eefb56342c4..d46f043c8612 100644 --- a/tools/tsdb/bloom-tester/lib.go +++ b/tools/tsdb/bloom-tester/lib.go @@ -47,7 +47,7 @@ func execute() { periodCfg, tableRange, tableName, err := helpers.GetPeriodConfigForTableNumber(bucket, conf.SchemaConfig.Configs) helpers.ExitErr("find period config for bucket", err) - objectClient, err := storage.NewObjectClient(periodCfg.ObjectType, conf.StorageConfig, clientMetrics) + objectClient, err := storage.NewObjectClient("bloom-tester", periodCfg.ObjectType, conf.StorageConfig, clientMetrics, prometheus.DefaultRegisterer) helpers.ExitErr("creating object client", err) chunkClient := client.NewClient(objectClient, nil, conf.SchemaConfig) diff --git a/tools/tsdb/bloom-tester/readlib.go b/tools/tsdb/bloom-tester/readlib.go index eaca7a38c15b..3008043f8b9c 100644 --- a/tools/tsdb/bloom-tester/readlib.go +++ b/tools/tsdb/bloom-tester/readlib.go @@ -66,7 +66,7 @@ func executeRead() { periodCfg, tableRange, tableName, err := helpers.GetPeriodConfigForTableNumber(bucket, conf.SchemaConfig.Configs) helpers.ExitErr("find period config for bucket", err) - objectClient, err := storage.NewObjectClient(periodCfg.ObjectType, conf.StorageConfig, clientMetrics) + objectClient, err := storage.NewObjectClient("bloom-tester", periodCfg.ObjectType, conf.StorageConfig, clientMetrics, prometheus.DefaultRegisterer) helpers.ExitErr("creating object client", err) chunkClient := client.NewClient(objectClient, nil, conf.SchemaConfig) diff --git a/tools/tsdb/index-analyzer/main.go b/tools/tsdb/index-analyzer/main.go index fd59bd4792fd..87162630675b 100644 --- a/tools/tsdb/index-analyzer/main.go +++ b/tools/tsdb/index-analyzer/main.go @@ -24,7 +24,7 @@ func main() { periodCfg, tableRange, tableName, err := helpers.GetPeriodConfigForTableNumber(bucket, conf.SchemaConfig.Configs) helpers.ExitErr("find period config for bucket", err) - objectClient, err := storage.NewObjectClient(periodCfg.ObjectType, conf.StorageConfig, clientMetrics) + objectClient, err := storage.NewObjectClient("index-analyzer", periodCfg.ObjectType, conf.StorageConfig, clientMetrics, prometheus.DefaultRegisterer) helpers.ExitErr("creating object client", err) shipper, err := indexshipper.NewIndexShipper( diff --git a/tools/tsdb/migrate-versions/main.go b/tools/tsdb/migrate-versions/main.go index b458c80d4c1b..bb9c3b8f23bc 100644 --- a/tools/tsdb/migrate-versions/main.go +++ b/tools/tsdb/migrate-versions/main.go @@ -97,7 +97,7 @@ func main() { } func migrateTables(pCfg config.PeriodConfig, storageCfg storage.Config, clientMetrics storage.ClientMetrics, tableRange config.TableRange) error { - objClient, err := storage.NewObjectClient(pCfg.ObjectType, storageCfg, clientMetrics) + objClient, err := storage.NewObjectClient("tables-migration-tool", pCfg.ObjectType, storageCfg, clientMetrics, prometheus.DefaultRegisterer) if err != nil { return err } diff --git a/tools/tsdb/migrate-versions/main_test.go b/tools/tsdb/migrate-versions/main_test.go index 2f4690fde0a7..cfadbdd52661 100644 --- a/tools/tsdb/migrate-versions/main_test.go +++ b/tools/tsdb/migrate-versions/main_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" @@ -52,7 +53,7 @@ func TestMigrateTables(t *testing.T) { } clientMetrics := storage.NewClientMetrics() - objClient, err := storage.NewObjectClient(pcfg.ObjectType, storageCfg, clientMetrics) + objClient, err := storage.NewObjectClient("tables-migration-tool", pcfg.ObjectType, storageCfg, clientMetrics, prometheus.DefaultRegisterer) require.NoError(t, err) indexStorageClient := shipperstorage.NewIndexStorageClient(objClient, pcfg.IndexTables.PathPrefix)