Skip to content

Commit

Permalink
tokenizer v1 cleanup (#11272)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Removes all usage of v1 tokenizers, renames v2 to v1 since we never
released this in a production way.

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a)
  • Loading branch information
paul1r authored and jeschkies committed Nov 21, 2023
1 parent 7ef65b5 commit 0c94702
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 1,137 deletions.
87 changes: 44 additions & 43 deletions pkg/storage/bloom/v1/bloom_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1

import (
"context"
"encoding/binary"
"math"
"time"

Expand All @@ -27,9 +28,8 @@ Bloom filters are utilized for faster lookups of log lines.
type BloomTokenizer struct {
metrics *metrics

lineTokenizer Tokenizer
chunkIDTokenizer *WrappedTokenizer
cache map[string]interface{}
lineTokenizer *NGramTokenizer
cache map[string]interface{}
}

const CacheSize = 150000
Expand All @@ -46,17 +46,15 @@ func NewBloomTokenizer(reg prometheus.Registerer) (*BloomTokenizer, error) {
metrics: newMetrics(reg),
}
t.cache = make(map[string]interface{}, CacheSize)
t.lineTokenizer = NewNGramTokenizer(DefaultNGramLength, DefaultNGramLength+1, DefaultNGramSkip) // default to 4-grams, no skip
t.chunkIDTokenizer = ChunkIDTokenizer(t.lineTokenizer)
t.lineTokenizer = NewNGramTokenizer(DefaultNGramLength, DefaultNGramSkip) // default to 4-grams, no skip

level.Info(util_log.Logger).Log("bloom tokenizer created")

return t, nil
}

func (bt *BloomTokenizer) SetLineTokenizer(t Tokenizer) {
func (bt *BloomTokenizer) SetLineTokenizer(t *NGramTokenizer) {
bt.lineTokenizer = t
bt.chunkIDTokenizer = ChunkIDTokenizer(bt.lineTokenizer)
}

// TODO: Something real here with metrics
Expand All @@ -70,12 +68,27 @@ func clearCache(cache map[string]interface{}) {
}
}

func calculatePrefix(chk logproto.ChunkRef) []byte {
i64buf := make([]byte, binary.MaxVarintLen64)
i32buf := make([]byte, 4)
prefix := make([]byte, 32)

binary.PutVarint(i64buf, int64(chk.From))
prefix = append(prefix, i64buf...)
binary.PutVarint(i64buf, int64(chk.Through))
prefix = append(prefix, i64buf...)
binary.LittleEndian.PutUint32(i32buf, chk.Checksum)
prefix = append(prefix, i32buf...)

return prefix
}

// PopulateSeriesWithBloom is intended to be called on the write path, and is used to populate the bloom filter for a given series.
func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBloom, chunks []chunk.Chunk) {
clearCache(bt.cache)
for idx := range chunks {
lc := chunks[idx].Data.(*chunkenc.Facade).LokiChunk()
bt.chunkIDTokenizer.Reinit(chunks[idx].ChunkRef)
prefix := calculatePrefix(chunks[idx].ChunkRef)

// TODO: error handling
itr, err := lc.Iterator(
Expand All @@ -93,23 +106,41 @@ func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBlo
defer itr.Close()

for itr.Next() && itr.Error() == nil {
toks := bt.chunkIDTokenizer.Tokens(itr.Entry().Line)
chunkTokenizer := NewPrefixedTokenIter(prefix, bt.lineTokenizer.Tokens(itr.Entry().Line))
for chunkTokenizer.Next() {
tok := chunkTokenizer.At()
if tok != nil {
str := string(tok)
_, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters
if !found {
bt.cache[str] = nil

seriesWithBloom.Bloom.ScalableBloomFilter.TestAndAdd(tok)

for _, tok := range toks {
if tok.Key != nil {
str := string(tok.Key)
if len(bt.cache) >= CacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
}
}
}
}
lineTokenizer := bt.lineTokenizer.Tokens(itr.Entry().Line)
for lineTokenizer.Next() {
tok := lineTokenizer.At()
if tok != nil {
str := string(tok)
_, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters
if !found {
bt.cache[str] = nil

seriesWithBloom.Bloom.ScalableBloomFilter.TestAndAdd(tok.Key)
seriesWithBloom.Bloom.ScalableBloomFilter.TestAndAdd(tok)

if len(bt.cache) >= CacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
}
}
}
}

}
seriesWithBloom.Series.Chunks = append(seriesWithBloom.Series.Chunks, ChunkRef{
Start: chunks[idx].From,
Expand All @@ -118,33 +149,3 @@ func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBlo
})
} // for each chunk
}

// SearchesForTokenizerAndLine is for taking a given search string (ex: on the read/query path) and returning
// all the possible tokens, given a tokenizer.
// This is a multi-dimensional slice where the first slice is the offset into the line, and the
// second slice is the tokens for that offset. If an offset into the line returns no tokens, this first dimension
// will be less than 1 + the number of skips specified in the tokenizer
// The offset is used if the Tokenizer has a skip value being utilized.
func SearchesForTokenizerAndLine(t Tokenizer, line string) (res [][]Token) {
res = make([][]Token, 0, 10)
for i := range line { // iterate by runes
if i >= t.GetSkip()+1 {
break
}
tmpTokens := make([]Token, 0, 100)
tokens := t.Tokens(line[i:])
// As the way the tokenizer is coded, it will reuse its internal buffers,
// but we need to save the data, hence the need for copying
for _, token := range tokens {
tmpToken := Token{}
tmpToken.Key = make([]byte, len(token.Key))
copy(tmpToken.Key, token.Key)
tmpTokens = append(tmpTokens, tmpToken)
}
if len(tokens) > 0 {
res = append(res, tmpTokens)
}
}

return res
}
100 changes: 14 additions & 86 deletions pkg/storage/bloom/v1/bloom_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,95 +20,21 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

var (
four = NewNGramTokenizer(4, 0)
)

func TestSetLineTokenizer(t *testing.T) {
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)

// Validate defaults
require.Equal(t, bt.lineTokenizer.GetMin(), DefaultNGramLength)
require.Equal(t, bt.lineTokenizer.GetMax(), DefaultNGramLength+1)
require.Equal(t, bt.lineTokenizer.GetSkip(), DefaultNGramSkip)

require.Equal(t, bt.chunkIDTokenizer.GetMin(), DefaultNGramLength)
require.Equal(t, bt.chunkIDTokenizer.GetMax(), DefaultNGramLength+1)
require.Equal(t, bt.chunkIDTokenizer.GetSkip(), DefaultNGramSkip)
require.Equal(t, bt.lineTokenizer.N, DefaultNGramLength)
require.Equal(t, bt.lineTokenizer.Skip, DefaultNGramSkip)

// Set new tokenizer, and validate against that
bt.SetLineTokenizer(NewNGramTokenizer(6, 7, 2))
require.Equal(t, bt.lineTokenizer.GetMin(), 6)
require.Equal(t, bt.lineTokenizer.GetMax(), 7)
require.Equal(t, bt.lineTokenizer.GetSkip(), 2)

require.Equal(t, bt.chunkIDTokenizer.GetMin(), 6)
require.Equal(t, bt.chunkIDTokenizer.GetMax(), 7)
require.Equal(t, bt.chunkIDTokenizer.GetSkip(), 2)
}

func TestSearchesForTokenizerAndLine(t *testing.T) {
for _, tc := range []struct {
desc string
input string
t Tokenizer
exp [][]Token
}{
{
desc: "empty",
input: "",
t: four,
exp: [][]Token{},
},
{
desc: "single char",
input: "a",
t: four,
exp: [][]Token{},
},
{
desc: "four chars",
input: "abcd",
t: four,
exp: [][]Token{
{{Key: []byte("abcd")}}},
},
{
desc: "uuid partial",
input: "2b1a5e46-36a2-4",
t: four,
exp: [][]Token{{
{Key: []byte("2b1a")},
{Key: []byte("b1a5")},
{Key: []byte("1a5e")},
{Key: []byte("a5e4")},
{Key: []byte("5e46")},
{Key: []byte("e46-")},
{Key: []byte("46-3")},
{Key: []byte("6-36")},
{Key: []byte("-36a")},
{Key: []byte("36a2")},
{Key: []byte("6a2-")},
{Key: []byte("a2-4")}},
},
},
{
desc: "short special chars",
t: four,
input: "日本語",
exp: [][]Token{},
},
{
desc: "longer special chars",
t: four,
input: "日本語日本語",
exp: [][]Token{{
{Key: []byte("日本語日")},
{Key: []byte("本語日本")},
{Key: []byte("語日本語")}}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, SearchesForTokenizerAndLine(tc.t, tc.input))
})
}

bt.SetLineTokenizer(NewNGramTokenizer(6, 7))
require.Equal(t, bt.lineTokenizer.N, 6)
require.Equal(t, bt.lineTokenizer.Skip, 7)
}

func TestPopulateSeriesWithBloom(t *testing.T) {
Expand Down Expand Up @@ -149,9 +75,11 @@ func TestPopulateSeriesWithBloom(t *testing.T) {
}

bt.PopulateSeriesWithBloom(&swb, chunks)
tokens := SearchesForTokenizerAndLine(four, testLine)
for _, token := range tokens[0] {
require.True(t, swb.Bloom.Test(token.Key))
tokenizer := NewNGramTokenizer(DefaultNGramLength, DefaultNGramSkip)
itr := tokenizer.Tokens(testLine)
for itr.Next() {
token := itr.At()
require.True(t, swb.Bloom.Test(token))
}
}

Expand Down
Loading

0 comments on commit 0c94702

Please sign in to comment.