diff --git a/pkg/bloombuild/builder/spec.go b/pkg/bloombuild/builder/spec.go index 3feca8f49a3b..abb6cef1447f 100644 --- a/pkg/bloombuild/builder/spec.go +++ b/pkg/bloombuild/builder/spec.go @@ -94,6 +94,11 @@ func NewSimpleBloomGenerator( opts.Schema.NGramSkip(), int(opts.UnencodedBlockOptions.MaxBloomSizeBytes), metrics, + log.With( + logger, + "component", "bloom_tokenizer", + "org_id", userID, + ), ), } } diff --git a/pkg/bloomcompactor/spec.go b/pkg/bloomcompactor/spec.go index 7a1d000dde70..61cd8f1d06a4 100644 --- a/pkg/bloomcompactor/spec.go +++ b/pkg/bloomcompactor/spec.go @@ -94,6 +94,11 @@ func NewSimpleBloomGenerator( opts.Schema.NGramSkip(), int(opts.UnencodedBlockOptions.MaxBloomSizeBytes), metrics.bloomMetrics, + log.With( + logger, + "component", "bloom_tokenizer", + "org_id", userID, + ), ), } } diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index af5060e2c0c4..274f4c37f25d 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -4,16 +4,15 @@ import ( "math" "unsafe" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/loki/pkg/push" - "github.com/grafana/loki/v3/pkg/iter" v2iter "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" "github.com/grafana/loki/v3/pkg/util/encoding" - util_log "github.com/grafana/loki/v3/pkg/util/log" ) /* @@ -24,6 +23,7 @@ Bloom filters are utilized for faster lookups of log lines. */ type BloomTokenizer struct { metrics *Metrics + logger log.Logger maxBloomSize int // size in bytes lineTokenizer *NGramTokenizer @@ -39,11 +39,11 @@ const eightBits = 8 // 1) The token slices generated must not be mutated externally // 2) The token slice must not be used after the next call to `Tokens()` as it will repopulate the slice. // 2) This is not thread safe. -func NewBloomTokenizer(nGramLen, nGramSkip int, maxBloomSize int, metrics *Metrics) *BloomTokenizer { - // TODO(chaudum): Replace logger - level.Info(util_log.Logger).Log("msg", "create new bloom tokenizer", "ngram length", nGramLen, "ngram skip", nGramSkip) +func NewBloomTokenizer(nGramLen, nGramSkip int, maxBloomSize int, metrics *Metrics, logger log.Logger) *BloomTokenizer { + level.Info(logger).Log("msg", "create new bloom tokenizer", "ngram length", nGramLen, "ngram skip", nGramSkip) return &BloomTokenizer{ metrics: metrics, + logger: logger, cache: make(map[string]interface{}, cacheSize), lineTokenizer: NewNGramTokenizer(nGramLen, nGramSkip), maxBloomSize: maxBloomSize, @@ -120,6 +120,16 @@ func (bt *BloomTokenizer) Populate( if next { // The last bloom has been made available via the `Next()` call above bloom = blooms.At() + + // TODO(salvacorts): Delete this once we solve the correctness bug + // We noticed some blooms are empty on the resulting blocks. + // We have the feeling that the empty blooms may be reused from old blocks. + // Here we log an error if we find an empty bloom. + if bloom.Count() == 0 { + level.Warn(bt.logger).Log( + "msg", "found existing empty bloom", + ) + } } else { bloom = bt.newBloom() } @@ -155,7 +165,13 @@ func (bt *BloomTokenizer) Populate( break } + } + // TODO(salvacorts): Delete this once we solve the correctness bug + if bloom.Count() == 0 { + level.Warn(bt.logger).Log( + "msg", "resulting bloom is empty", + ) } // Send the last bloom diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go index 7023958eca11..8f3e4f473e93 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -8,10 +8,10 @@ import ( "testing" "time" + logger "github.com/go-kit/log" "github.com/grafana/dskit/multierror" "github.com/grafana/loki/pkg/push" - "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/iter" v2 "github.com/grafana/loki/v3/pkg/iter/v2" @@ -82,7 +82,7 @@ func TestPrefixedKeyCreation(t *testing.T) { func TestSetLineTokenizer(t *testing.T) { t.Parallel() - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics) + bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger()) // Validate defaults require.Equal(t, bt.lineTokenizer.N(), DefaultNGramLength) @@ -97,7 +97,7 @@ func TestSetLineTokenizer(t *testing.T) { func TestTokenizerPopulate(t *testing.T) { t.Parallel() var testLine = "this is a log line" - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics) + bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger()) sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8) @@ -138,7 +138,7 @@ func TestTokenizerPopulate(t *testing.T) { func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) { var testLine = "this is a log line" - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics) + bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger()) memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) _, _ = memChunk.Append(&push.Entry{ @@ -206,7 +206,7 @@ func randomStr(ln int) string { func TestTokenizerPopulateWontExceedMaxSize(t *testing.T) { maxSize := 2048 - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, maxSize, NewMetrics(nil)) + bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, maxSize, NewMetrics(nil), logger.NewNopLogger()) ch := make(chan *BloomCreation) line := randomStr(10e3) itr, err := chunkRefItrFromLines(line) @@ -257,7 +257,7 @@ func populateAndConsumeBloom( func BenchmarkPopulateSeriesWithBloom(b *testing.B) { for i := 0; i < b.N; i++ { var testLine = lorem + lorem + lorem - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics) + bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger()) sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8) @@ -290,7 +290,7 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) { } func TestTokenizerClearsCacheBetweenPopulateCalls(t *testing.T) { - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, NewMetrics(nil)) + bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, NewMetrics(nil), logger.NewNopLogger()) line := "foobarbazz" var blooms []*Bloom @@ -329,7 +329,7 @@ func TestTokenizerClearsCacheBetweenPopulateCalls(t *testing.T) { } func BenchmarkMapClear(b *testing.B) { - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics) + bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger()) for i := 0; i < b.N; i++ { for k := 0; k < cacheSize; k++ { bt.cache[fmt.Sprint(k)] = k @@ -340,7 +340,7 @@ func BenchmarkMapClear(b *testing.B) { } func BenchmarkNewMap(b *testing.B) { - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics) + bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger()) for i := 0; i < b.N; i++ { for k := 0; k < cacheSize; k++ { bt.cache[fmt.Sprint(k)] = k