diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index b144c1d695e1..f6197888a854 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -8,7 +8,6 @@ import ( "hash" "hash/crc32" "io" - "reflect" "time" "unsafe" @@ -1305,11 +1304,7 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra } func unsafeGetBytes(s string) []byte { - var buf []byte - p := unsafe.Pointer(&buf) - *(*string)(p) = s - (*reflect.SliceHeader)(p).Cap = len(s) - return buf + return unsafe.Slice(unsafe.StringData(s), len(s)) } type bufferedIterator struct { @@ -1560,7 +1555,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) { si.stats.AddDecompressedStructuredMetadataBytes(decompressedStructuredMetadataBytes) si.stats.AddDecompressedBytes(decompressedBytes + decompressedStructuredMetadataBytes) - return ts, si.buf[:lineSize], si.symbolizer.Lookup(si.symbolsBuf[:nSymbols]), true + return ts, si.buf[:lineSize], si.symbolizer.Lookup(si.symbolsBuf[:nSymbols], si.currStructuredMetadata), true } func (si *bufferedIterator) Err() error { return si.err } @@ -1589,6 +1584,11 @@ func (si *bufferedIterator) close() { si.symbolsBuf = nil } + if si.currStructuredMetadata != nil { + structuredMetadataPool.Put(si.currStructuredMetadata) // nolint:staticcheck + si.currStructuredMetadata = nil + } + si.origBytes = nil } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index af8ef89339af..6c48a28b0650 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -99,7 +99,6 @@ func TestBlocksInclusive(t *testing.T) { require.Equal(t, 1, blocks[0].Entries()) } } - } func TestBlock(t *testing.T) { @@ -387,7 +386,6 @@ func TestRoundtripV2(t *testing.T) { assertLines(loaded) }) } - } } @@ -886,9 +884,11 @@ func (nomatchPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsRes func (nomatchPipeline) Process(_ int64, line []byte, _ ...labels.Label) ([]byte, log.LabelsResult, bool) { return line, nil, false } + func (nomatchPipeline) ProcessString(_ int64, line string, _ ...labels.Label) (string, log.LabelsResult, bool) { return line, nil, false } + func (nomatchPipeline) ReferencedStructuredMetadata() bool { return false } @@ -947,16 +947,31 @@ func BenchmarkRead(b *testing.B) { } } +type noopTestPipeline struct{} + +func (noopTestPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsResult } +func (noopTestPipeline) Process(_ int64, line []byte, _ ...labels.Label) ([]byte, log.LabelsResult, bool) { + return line, nil, false +} + +func (noopTestPipeline) ProcessString(_ int64, line string, _ ...labels.Label) (string, log.LabelsResult, bool) { + return line, nil, false +} + +func (noopTestPipeline) ReferencedStructuredMetadata() bool { + return false +} + func BenchmarkBackwardIterator(b *testing.B) { for _, bs := range testBlockSizes { b.Run(humanize.Bytes(uint64(bs)), func(b *testing.B) { b.ReportAllocs() - c := NewMemChunk(ChunkFormatV3, EncSnappy, DefaultTestHeadBlockFmt, bs, testTargetSize) + c := NewMemChunk(ChunkFormatV4, EncSnappy, DefaultTestHeadBlockFmt, bs, testTargetSize) _ = fillChunk(c) b.ResetTimer() for n := 0; n < b.N; n++ { - noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{}) - iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, noopStreamPipeline) + noop := noopTestPipeline{} + iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, noop) if err != nil { panic(err) } @@ -1754,7 +1769,6 @@ func TestMemChunk_SpaceFor(t *testing.T) { require.Equal(t, expect, chk.SpaceFor(&tc.entry)) }) } - }) } } diff --git a/pkg/chunkenc/symbols.go b/pkg/chunkenc/symbols.go index a3e2a5b011e2..e9f0b4952968 100644 --- a/pkg/chunkenc/symbols.go +++ b/pkg/chunkenc/symbols.go @@ -15,6 +15,12 @@ import ( "github.com/grafana/loki/v3/pkg/util" ) +var structuredMetadataPool = sync.Pool{ + New: func() interface{} { + return make(labels.Labels, 0, 8) + }, +} + // symbol holds reference to a label name and value pair type symbol struct { Name, Value uint32 @@ -90,18 +96,20 @@ func (s *symbolizer) add(lbl string) uint32 { } // Lookup coverts and returns labels pairs for the given symbols -func (s *symbolizer) Lookup(syms symbols) labels.Labels { +func (s *symbolizer) Lookup(syms symbols, buf labels.Labels) labels.Labels { if len(syms) == 0 { return nil } - lbls := make([]labels.Label, len(syms)) + if buf == nil { + buf = structuredMetadataPool.Get().(labels.Labels) + } + buf = buf[:0] - for i, symbol := range syms { - lbls[i].Name = s.lookup(symbol.Name) - lbls[i].Value = s.lookup(symbol.Value) + for _, symbol := range syms { + buf = append(buf, labels.Label{Name: s.lookup(symbol.Name), Value: s.lookup(symbol.Value)}) } - return lbls + return buf } func (s *symbolizer) lookup(idx uint32) string { diff --git a/pkg/chunkenc/symbols_test.go b/pkg/chunkenc/symbols_test.go index 86f3440662af..7882001c75dd 100644 --- a/pkg/chunkenc/symbols_test.go +++ b/pkg/chunkenc/symbols_test.go @@ -131,7 +131,7 @@ func TestSymbolizer(t *testing.T) { for i, labels := range tc.labelsToAdd { symbols := s.Add(labels) require.Equal(t, tc.expectedSymbols[i], symbols) - require.Equal(t, labels, s.Lookup(symbols)) + require.Equal(t, labels, s.Lookup(symbols, nil)) } // Test that Lookup returns empty labels if no symbols are provided. @@ -141,7 +141,7 @@ func TestSymbolizer(t *testing.T) { Name: 0, Value: 0, }, - }) + }, nil) require.Equal(t, "", ret[0].Name) require.Equal(t, "", ret[0].Value) } @@ -157,7 +157,7 @@ func TestSymbolizer(t *testing.T) { loaded := symbolizerFromCheckpoint(buf.Bytes()) for i, symbols := range tc.expectedSymbols { - require.Equal(t, tc.labelsToAdd[i], loaded.Lookup(symbols)) + require.Equal(t, tc.labelsToAdd[i], loaded.Lookup(symbols, nil)) } buf.Reset() @@ -167,7 +167,7 @@ func TestSymbolizer(t *testing.T) { loaded, err = symbolizerFromEnc(buf.Bytes(), GetReaderPool(encoding)) require.NoError(t, err) for i, symbols := range tc.expectedSymbols { - require.Equal(t, tc.labelsToAdd[i], loaded.Lookup(symbols)) + require.Equal(t, tc.labelsToAdd[i], loaded.Lookup(symbols, nil)) } }) } diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 683a4f17e2ad..aed6606c7c6d 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -5,7 +5,6 @@ import ( "context" "encoding/binary" "fmt" - "io" "math" "time" @@ -252,13 +251,15 @@ func (hb *unorderedHeadBlock) Iterator(ctx context.Context, direction logproto.D // cutting of blocks. streams := map[string]*logproto.Stream{} baseHash := pipeline.BaseLabels().Hash() + var structuredMetadata labels.Labels _ = hb.forEntries( ctx, direction, mint, maxt, func(statsCtx *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error { - newLine, parsedLbs, matches := pipeline.ProcessString(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)...) + structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, structuredMetadata) + newLine, parsedLbs, matches := pipeline.ProcessString(ts, line, structuredMetadata...) if !matches { return nil } @@ -294,7 +295,13 @@ func (hb *unorderedHeadBlock) Iterator(ctx context.Context, direction logproto.D for _, stream := range streams { streamsResult = append(streamsResult, *stream) } - return iter.NewStreamsIterator(streamsResult, direction) + + return iter.EntryIteratorWithClose(iter.NewStreamsIterator(streamsResult, direction), func() error { + if structuredMetadata != nil { + structuredMetadataPool.Put(structuredMetadata) // nolint:staticcheck + } + return nil + }) } // nolint:unused @@ -306,13 +313,15 @@ func (hb *unorderedHeadBlock) SampleIterator( ) iter.SampleIterator { series := map[string]*logproto.Series{} baseHash := extractor.BaseLabels().Hash() + var structuredMetadata labels.Labels _ = hb.forEntries( ctx, logproto.FORWARD, mint, maxt, func(statsCtx *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error { - value, parsedLabels, ok := extractor.ProcessString(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)...) + structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, structuredMetadata) + value, parsedLabels, ok := extractor.ProcessString(ts, line, structuredMetadata...) if !ok { return nil } @@ -355,6 +364,9 @@ func (hb *unorderedHeadBlock) SampleIterator( for _, s := range series { SamplesPool.Put(s.Samples) } + if structuredMetadata != nil { + structuredMetadataPool.Put(structuredMetadata) // nolint:staticcheck + } return nil }) } @@ -445,7 +457,7 @@ func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt, symbolizer *symboliz 0, math.MaxInt64, func(_ *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error { - _, err := out.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)) + _, err := out.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols, nil)) return err }, ) @@ -585,8 +597,7 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error { } } } - - if _, err := hb.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)); err != nil { + if _, err := hb.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols, nil)); err != nil { return err } } diff --git a/pkg/chunkenc/util_test.go b/pkg/chunkenc/util_test.go index de74f7946e2a..3da8f9e6d5cb 100644 --- a/pkg/chunkenc/util_test.go +++ b/pkg/chunkenc/util_test.go @@ -53,6 +53,13 @@ func fillChunkClose(c Chunk, close bool) int64 { entry := &logproto.Entry{ Timestamp: time.Unix(0, 0), Line: testdata.LogString(i), + StructuredMetadata: []logproto.LabelAdapter{ + {Name: "foo", Value: "bar"}, + {Name: "baz", Value: "buzz"}, + {Name: "qux", Value: "quux"}, + {Name: "corge", Value: "grault"}, + {Name: "garply", Value: "waldo"}, + }, } for c.SpaceFor(entry) { _, err := c.Append(entry) diff --git a/pkg/iter/entry_iterator.go b/pkg/iter/entry_iterator.go index b0b5a4635d1e..58e0ab929e7f 100644 --- a/pkg/iter/entry_iterator.go +++ b/pkg/iter/entry_iterator.go @@ -91,7 +91,6 @@ func (i *mergeEntryIterator) closeEntry(e EntryIterator) { i.errs = append(i.errs, err) } util.LogError("closing iterator", e.Close) - } func (i *mergeEntryIterator) Push(ei EntryIterator) { @@ -294,8 +293,8 @@ func (i *entrySortIterator) closeEntry(e EntryIterator) { i.errs = append(i.errs, err) } util.LogError("closing iterator", e.Close) - } + func (i *entrySortIterator) Next() bool { ret := i.tree.Next() if !ret { @@ -810,3 +809,33 @@ func (it *peekingEntryIterator) Err() error { func (it *peekingEntryIterator) Close() error { return it.iter.Close() } + +type withCloseEntryIterator struct { + closeOnce sync.Once + closeFn func() error + errs []error + EntryIterator +} + +func (w *withCloseEntryIterator) Close() error { + w.closeOnce.Do(func() { + if err := w.EntryIterator.Close(); err != nil { + w.errs = append(w.errs, err) + } + if err := w.closeFn(); err != nil { + w.errs = append(w.errs, err) + } + }) + if len(w.errs) == 0 { + return nil + } + return util.MultiError(w.errs) +} + +func EntryIteratorWithClose(it EntryIterator, closeFn func() error) EntryIterator { + return &withCloseEntryIterator{ + closeOnce: sync.Once{}, + closeFn: closeFn, + EntryIterator: it, + } +}