Skip to content

Commit

Permalink
test: Add logging for empty blooms (#13537)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Jul 16, 2024
1 parent 3ac130b commit b44517a
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 14 deletions.
5 changes: 5 additions & 0 deletions pkg/bloombuild/builder/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func NewSimpleBloomGenerator(
opts.Schema.NGramSkip(),
int(opts.UnencodedBlockOptions.MaxBloomSizeBytes),
metrics,
log.With(
logger,
"component", "bloom_tokenizer",
"org_id", userID,
),
),
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func NewSimpleBloomGenerator(
opts.Schema.NGramSkip(),
int(opts.UnencodedBlockOptions.MaxBloomSizeBytes),
metrics.bloomMetrics,
log.With(
logger,
"component", "bloom_tokenizer",
"org_id", userID,
),
),
}
}
Expand Down
26 changes: 21 additions & 5 deletions pkg/storage/bloom/v1/bloom_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import (
"math"
"unsafe"

"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/grafana/loki/pkg/push"

"github.com/grafana/loki/v3/pkg/iter"
v2iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter"
"github.com/grafana/loki/v3/pkg/util/encoding"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

/*
Expand All @@ -24,6 +23,7 @@ Bloom filters are utilized for faster lookups of log lines.
*/
type BloomTokenizer struct {
metrics *Metrics
logger log.Logger

maxBloomSize int // size in bytes
lineTokenizer *NGramTokenizer
Expand All @@ -39,11 +39,11 @@ const eightBits = 8
// 1) The token slices generated must not be mutated externally
// 2) The token slice must not be used after the next call to `Tokens()` as it will repopulate the slice.
// 2) This is not thread safe.
func NewBloomTokenizer(nGramLen, nGramSkip int, maxBloomSize int, metrics *Metrics) *BloomTokenizer {
// TODO(chaudum): Replace logger
level.Info(util_log.Logger).Log("msg", "create new bloom tokenizer", "ngram length", nGramLen, "ngram skip", nGramSkip)
func NewBloomTokenizer(nGramLen, nGramSkip int, maxBloomSize int, metrics *Metrics, logger log.Logger) *BloomTokenizer {
level.Info(logger).Log("msg", "create new bloom tokenizer", "ngram length", nGramLen, "ngram skip", nGramSkip)
return &BloomTokenizer{
metrics: metrics,
logger: logger,
cache: make(map[string]interface{}, cacheSize),
lineTokenizer: NewNGramTokenizer(nGramLen, nGramSkip),
maxBloomSize: maxBloomSize,
Expand Down Expand Up @@ -120,6 +120,16 @@ func (bt *BloomTokenizer) Populate(
if next {
// The last bloom has been made available via the `Next()` call above
bloom = blooms.At()

// TODO(salvacorts): Delete this once we solve the correctness bug
// We noticed some blooms are empty on the resulting blocks.
// We have the feeling that the empty blooms may be reused from old blocks.
// Here we log an error if we find an empty bloom.
if bloom.Count() == 0 {
level.Warn(bt.logger).Log(
"msg", "found existing empty bloom",
)
}
} else {
bloom = bt.newBloom()
}
Expand Down Expand Up @@ -155,7 +165,13 @@ func (bt *BloomTokenizer) Populate(

break
}
}

// TODO(salvacorts): Delete this once we solve the correctness bug
if bloom.Count() == 0 {
level.Warn(bt.logger).Log(
"msg", "resulting bloom is empty",
)
}

// Send the last bloom
Expand Down
18 changes: 9 additions & 9 deletions pkg/storage/bloom/v1/bloom_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"testing"
"time"

logger "github.com/go-kit/log"
"github.com/grafana/dskit/multierror"

"github.com/grafana/loki/pkg/push"

"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/iter"
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestPrefixedKeyCreation(t *testing.T) {

func TestSetLineTokenizer(t *testing.T) {
t.Parallel()
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger())

// Validate defaults
require.Equal(t, bt.lineTokenizer.N(), DefaultNGramLength)
Expand All @@ -97,7 +97,7 @@ func TestSetLineTokenizer(t *testing.T) {
func TestTokenizerPopulate(t *testing.T) {
t.Parallel()
var testLine = "this is a log line"
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger())

sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8)

Expand Down Expand Up @@ -138,7 +138,7 @@ func TestTokenizerPopulate(t *testing.T) {

func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) {
var testLine = "this is a log line"
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger())

memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
_, _ = memChunk.Append(&push.Entry{
Expand Down Expand Up @@ -206,7 +206,7 @@ func randomStr(ln int) string {

func TestTokenizerPopulateWontExceedMaxSize(t *testing.T) {
maxSize := 2048
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, maxSize, NewMetrics(nil))
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, maxSize, NewMetrics(nil), logger.NewNopLogger())
ch := make(chan *BloomCreation)
line := randomStr(10e3)
itr, err := chunkRefItrFromLines(line)
Expand Down Expand Up @@ -257,7 +257,7 @@ func populateAndConsumeBloom(
func BenchmarkPopulateSeriesWithBloom(b *testing.B) {
for i := 0; i < b.N; i++ {
var testLine = lorem + lorem + lorem
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger())

sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8)

Expand Down Expand Up @@ -290,7 +290,7 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) {
}

func TestTokenizerClearsCacheBetweenPopulateCalls(t *testing.T) {
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, NewMetrics(nil))
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, NewMetrics(nil), logger.NewNopLogger())
line := "foobarbazz"
var blooms []*Bloom

Expand Down Expand Up @@ -329,7 +329,7 @@ func TestTokenizerClearsCacheBetweenPopulateCalls(t *testing.T) {
}

func BenchmarkMapClear(b *testing.B) {
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger())
for i := 0; i < b.N; i++ {
for k := 0; k < cacheSize; k++ {
bt.cache[fmt.Sprint(k)] = k
Expand All @@ -340,7 +340,7 @@ func BenchmarkMapClear(b *testing.B) {
}

func BenchmarkNewMap(b *testing.B) {
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics)
bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger())
for i := 0; i < b.N; i++ {
for k := 0; k < cacheSize; k++ {
bt.cache[fmt.Sprint(k)] = k
Expand Down

0 comments on commit b44517a

Please sign in to comment.