Skip to content

Commit

Permalink
chore: Improve performance of structured metadata (#13404)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Jul 4, 2024
1 parent c694953 commit 1e33ae1
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 32 deletions.
14 changes: 7 additions & 7 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"hash"
"hash/crc32"
"io"
"reflect"
"time"
"unsafe"

Expand Down Expand Up @@ -1305,11 +1304,7 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra
}

func unsafeGetBytes(s string) []byte {
var buf []byte
p := unsafe.Pointer(&buf)
*(*string)(p) = s
(*reflect.SliceHeader)(p).Cap = len(s)
return buf
return unsafe.Slice(unsafe.StringData(s), len(s))
}

type bufferedIterator struct {
Expand Down Expand Up @@ -1560,7 +1555,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {
si.stats.AddDecompressedStructuredMetadataBytes(decompressedStructuredMetadataBytes)
si.stats.AddDecompressedBytes(decompressedBytes + decompressedStructuredMetadataBytes)

return ts, si.buf[:lineSize], si.symbolizer.Lookup(si.symbolsBuf[:nSymbols]), true
return ts, si.buf[:lineSize], si.symbolizer.Lookup(si.symbolsBuf[:nSymbols], si.currStructuredMetadata), true
}

func (si *bufferedIterator) Err() error { return si.err }
Expand Down Expand Up @@ -1589,6 +1584,11 @@ func (si *bufferedIterator) close() {
si.symbolsBuf = nil
}

if si.currStructuredMetadata != nil {
structuredMetadataPool.Put(si.currStructuredMetadata) // nolint:staticcheck
si.currStructuredMetadata = nil
}

si.origBytes = nil
}

Expand Down
26 changes: 20 additions & 6 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ func TestBlocksInclusive(t *testing.T) {
require.Equal(t, 1, blocks[0].Entries())
}
}

}

func TestBlock(t *testing.T) {
Expand Down Expand Up @@ -387,7 +386,6 @@ func TestRoundtripV2(t *testing.T) {
assertLines(loaded)
})
}

}
}

Expand Down Expand Up @@ -886,9 +884,11 @@ func (nomatchPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsRes
func (nomatchPipeline) Process(_ int64, line []byte, _ ...labels.Label) ([]byte, log.LabelsResult, bool) {
return line, nil, false
}

func (nomatchPipeline) ProcessString(_ int64, line string, _ ...labels.Label) (string, log.LabelsResult, bool) {
return line, nil, false
}

func (nomatchPipeline) ReferencedStructuredMetadata() bool {
return false
}
Expand Down Expand Up @@ -947,16 +947,31 @@ func BenchmarkRead(b *testing.B) {
}
}

type noopTestPipeline struct{}

func (noopTestPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsResult }
func (noopTestPipeline) Process(_ int64, line []byte, _ ...labels.Label) ([]byte, log.LabelsResult, bool) {
return line, nil, false
}

func (noopTestPipeline) ProcessString(_ int64, line string, _ ...labels.Label) (string, log.LabelsResult, bool) {
return line, nil, false
}

func (noopTestPipeline) ReferencedStructuredMetadata() bool {
return false
}

func BenchmarkBackwardIterator(b *testing.B) {
for _, bs := range testBlockSizes {
b.Run(humanize.Bytes(uint64(bs)), func(b *testing.B) {
b.ReportAllocs()
c := NewMemChunk(ChunkFormatV3, EncSnappy, DefaultTestHeadBlockFmt, bs, testTargetSize)
c := NewMemChunk(ChunkFormatV4, EncSnappy, DefaultTestHeadBlockFmt, bs, testTargetSize)
_ = fillChunk(c)
b.ResetTimer()
for n := 0; n < b.N; n++ {
noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{})
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, noopStreamPipeline)
noop := noopTestPipeline{}
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, noop)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -1754,7 +1769,6 @@ func TestMemChunk_SpaceFor(t *testing.T) {
require.Equal(t, expect, chk.SpaceFor(&tc.entry))
})
}

})
}
}
Expand Down
20 changes: 14 additions & 6 deletions pkg/chunkenc/symbols.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (
"github.com/grafana/loki/v3/pkg/util"
)

var structuredMetadataPool = sync.Pool{
New: func() interface{} {
return make(labels.Labels, 0, 8)
},
}

// symbol holds reference to a label name and value pair
type symbol struct {
Name, Value uint32
Expand Down Expand Up @@ -90,18 +96,20 @@ func (s *symbolizer) add(lbl string) uint32 {
}

// Lookup coverts and returns labels pairs for the given symbols
func (s *symbolizer) Lookup(syms symbols) labels.Labels {
func (s *symbolizer) Lookup(syms symbols, buf labels.Labels) labels.Labels {
if len(syms) == 0 {
return nil
}
lbls := make([]labels.Label, len(syms))
if buf == nil {
buf = structuredMetadataPool.Get().(labels.Labels)
}
buf = buf[:0]

for i, symbol := range syms {
lbls[i].Name = s.lookup(symbol.Name)
lbls[i].Value = s.lookup(symbol.Value)
for _, symbol := range syms {
buf = append(buf, labels.Label{Name: s.lookup(symbol.Name), Value: s.lookup(symbol.Value)})
}

return lbls
return buf
}

func (s *symbolizer) lookup(idx uint32) string {
Expand Down
8 changes: 4 additions & 4 deletions pkg/chunkenc/symbols_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestSymbolizer(t *testing.T) {
for i, labels := range tc.labelsToAdd {
symbols := s.Add(labels)
require.Equal(t, tc.expectedSymbols[i], symbols)
require.Equal(t, labels, s.Lookup(symbols))
require.Equal(t, labels, s.Lookup(symbols, nil))
}

// Test that Lookup returns empty labels if no symbols are provided.
Expand All @@ -141,7 +141,7 @@ func TestSymbolizer(t *testing.T) {
Name: 0,
Value: 0,
},
})
}, nil)
require.Equal(t, "", ret[0].Name)
require.Equal(t, "", ret[0].Value)
}
Expand All @@ -157,7 +157,7 @@ func TestSymbolizer(t *testing.T) {

loaded := symbolizerFromCheckpoint(buf.Bytes())
for i, symbols := range tc.expectedSymbols {
require.Equal(t, tc.labelsToAdd[i], loaded.Lookup(symbols))
require.Equal(t, tc.labelsToAdd[i], loaded.Lookup(symbols, nil))
}

buf.Reset()
Expand All @@ -167,7 +167,7 @@ func TestSymbolizer(t *testing.T) {
loaded, err = symbolizerFromEnc(buf.Bytes(), GetReaderPool(encoding))
require.NoError(t, err)
for i, symbols := range tc.expectedSymbols {
require.Equal(t, tc.labelsToAdd[i], loaded.Lookup(symbols))
require.Equal(t, tc.labelsToAdd[i], loaded.Lookup(symbols, nil))
}
})
}
Expand Down
25 changes: 18 additions & 7 deletions pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"encoding/binary"
"fmt"

"io"
"math"
"time"
Expand Down Expand Up @@ -252,13 +251,15 @@ func (hb *unorderedHeadBlock) Iterator(ctx context.Context, direction logproto.D
// cutting of blocks.
streams := map[string]*logproto.Stream{}
baseHash := pipeline.BaseLabels().Hash()
var structuredMetadata labels.Labels
_ = hb.forEntries(
ctx,
direction,
mint,
maxt,
func(statsCtx *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error {
newLine, parsedLbs, matches := pipeline.ProcessString(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)...)
structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, structuredMetadata)
newLine, parsedLbs, matches := pipeline.ProcessString(ts, line, structuredMetadata...)
if !matches {
return nil
}
Expand Down Expand Up @@ -294,7 +295,13 @@ func (hb *unorderedHeadBlock) Iterator(ctx context.Context, direction logproto.D
for _, stream := range streams {
streamsResult = append(streamsResult, *stream)
}
return iter.NewStreamsIterator(streamsResult, direction)

return iter.EntryIteratorWithClose(iter.NewStreamsIterator(streamsResult, direction), func() error {
if structuredMetadata != nil {
structuredMetadataPool.Put(structuredMetadata) // nolint:staticcheck
}
return nil
})
}

// nolint:unused
Expand All @@ -306,13 +313,15 @@ func (hb *unorderedHeadBlock) SampleIterator(
) iter.SampleIterator {
series := map[string]*logproto.Series{}
baseHash := extractor.BaseLabels().Hash()
var structuredMetadata labels.Labels
_ = hb.forEntries(
ctx,
logproto.FORWARD,
mint,
maxt,
func(statsCtx *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error {
value, parsedLabels, ok := extractor.ProcessString(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)...)
structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, structuredMetadata)
value, parsedLabels, ok := extractor.ProcessString(ts, line, structuredMetadata...)
if !ok {
return nil
}
Expand Down Expand Up @@ -355,6 +364,9 @@ func (hb *unorderedHeadBlock) SampleIterator(
for _, s := range series {
SamplesPool.Put(s.Samples)
}
if structuredMetadata != nil {
structuredMetadataPool.Put(structuredMetadata) // nolint:staticcheck
}
return nil
})
}
Expand Down Expand Up @@ -445,7 +457,7 @@ func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt, symbolizer *symboliz
0,
math.MaxInt64,
func(_ *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error {
_, err := out.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols))
_, err := out.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols, nil))
return err
},
)
Expand Down Expand Up @@ -585,8 +597,7 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error {
}
}
}

if _, err := hb.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)); err != nil {
if _, err := hb.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols, nil)); err != nil {
return err
}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/chunkenc/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ func fillChunkClose(c Chunk, close bool) int64 {
entry := &logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: testdata.LogString(i),
StructuredMetadata: []logproto.LabelAdapter{
{Name: "foo", Value: "bar"},
{Name: "baz", Value: "buzz"},
{Name: "qux", Value: "quux"},
{Name: "corge", Value: "grault"},
{Name: "garply", Value: "waldo"},
},
}
for c.SpaceFor(entry) {
_, err := c.Append(entry)
Expand Down
33 changes: 31 additions & 2 deletions pkg/iter/entry_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func (i *mergeEntryIterator) closeEntry(e EntryIterator) {
i.errs = append(i.errs, err)
}
util.LogError("closing iterator", e.Close)

}

func (i *mergeEntryIterator) Push(ei EntryIterator) {
Expand Down Expand Up @@ -294,8 +293,8 @@ func (i *entrySortIterator) closeEntry(e EntryIterator) {
i.errs = append(i.errs, err)
}
util.LogError("closing iterator", e.Close)

}

func (i *entrySortIterator) Next() bool {
ret := i.tree.Next()
if !ret {
Expand Down Expand Up @@ -810,3 +809,33 @@ func (it *peekingEntryIterator) Err() error {
func (it *peekingEntryIterator) Close() error {
return it.iter.Close()
}

type withCloseEntryIterator struct {
closeOnce sync.Once
closeFn func() error
errs []error
EntryIterator
}

func (w *withCloseEntryIterator) Close() error {
w.closeOnce.Do(func() {
if err := w.EntryIterator.Close(); err != nil {
w.errs = append(w.errs, err)
}
if err := w.closeFn(); err != nil {
w.errs = append(w.errs, err)
}
})
if len(w.errs) == 0 {
return nil
}
return util.MultiError(w.errs)
}

func EntryIteratorWithClose(it EntryIterator, closeFn func() error) EntryIterator {
return &withCloseEntryIterator{
closeOnce: sync.Once{},
closeFn: closeFn,
EntryIterator: it,
}
}

0 comments on commit 1e33ae1

Please sign in to comment.