From ec00fec0af11b2445a8b9dbe7ddaa8f7c2a4e8a3 Mon Sep 17 00:00:00 2001 From: Joao Marcal Date: Mon, 30 Sep 2024 11:52:12 +0100 Subject: [PATCH] chore: add ObjectExistsWithSize to respect the interface new method was added in https://github.com/grafana/loki/pull/14268 --- pkg/storage/bucket/client.go | 29 +++++++------------ pkg/storage/chunk/client/client.go | 5 ++-- .../client/gcp/gcs_thanos_object_client.go | 15 ++++++++++ 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 0e2b69ed680a1..504addbc66689 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -183,16 +183,7 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, client = NewPrefixedBucketClient(client, cfg.StoragePrefix) } - if metrics.BucketMetrics != nil { - client = bucketWrapWith(client, metrics.BucketMetrics) - } else { - bucketMetrics := bucketMetrics(name, metrics.Registerer) - client = bucketWrapWith(client, bucketMetrics) - // Save metrics to be assigned to other buckets created with the same component name - metrics.BucketMetrics = bucketMetrics - } - - instrumentedClient := objstoretracing.WrapWithTraces(client) + instrumentedClient := objstoretracing.WrapWithTraces(bucketWithMetrics(client, name, metrics)) // Wrap the client with any provided middleware for _, wrap := range cfg.Middlewares { @@ -205,16 +196,18 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, return instrumentedClient, nil } -func bucketMetrics(name string, reg prometheus.Registerer) *objstore.Metrics { - reg = prometheus.WrapRegistererWithPrefix("loki_", reg) - reg = prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg) - return objstore.BucketMetrics(reg, "") -} - -func bucketWrapWith(bucketClient objstore.Bucket, metrics *objstore.Metrics) objstore.Bucket { +func bucketWithMetrics(bucketClient objstore.Bucket, name string, metrics *Metrics) objstore.Bucket { if metrics == nil { return bucketClient } - return objstore.WrapWith(bucketClient, metrics) + if metrics.BucketMetrics == nil { + reg := metrics.Registerer + reg = prometheus.WrapRegistererWithPrefix("loki_", reg) + reg = prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg) + // Save metrics to be assigned to other buckets created with the same component name + metrics.BucketMetrics = objstore.BucketMetrics(reg, "") + } + + return objstore.WrapWith(bucketClient, metrics.BucketMetrics) } diff --git a/pkg/storage/chunk/client/client.go b/pkg/storage/chunk/client/client.go index f6fe80ca21891..d89c540b29efa 100644 --- a/pkg/storage/chunk/client/client.go +++ b/pkg/storage/chunk/client/client.go @@ -4,10 +4,11 @@ import ( "context" "errors" - "github.com/grafana/loki/v3/pkg/storage/chunk" - "github.com/grafana/loki/v3/pkg/storage/stores/series/index" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" + + "github.com/grafana/loki/v3/pkg/storage/chunk" + "github.com/grafana/loki/v3/pkg/storage/stores/series/index" ) var ( diff --git a/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go b/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go index 432e673093a74..daf9c101a6ec7 100644 --- a/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go +++ b/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go @@ -72,6 +72,21 @@ func (s *GCSThanosObjectClient) ObjectExists(ctx context.Context, objectKey stri return s.client.Exists(ctx, objectKey) } +// ObjectExistsWithSize checks if a given objectKey exists and it's size in the GCS bucket +func (s *GCSThanosObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) { + _, err := s.client.Get(ctx, objectKey) + if err != nil { + return false, 0, err + } + + attr, err := s.client.Attributes(ctx, objectKey) + if err != nil { + return true, 0, nil + } + + return true, attr.Size, nil +} + // PutObject puts the specified bytes into the configured GCS bucket at the provided key func (s *GCSThanosObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error { return s.client.Upload(ctx, objectKey, object)