Skip to content

Commit

Permalink
Merge branch 'main' into add-containerSecurityContext-to-statefulset-…
Browse files Browse the repository at this point in the history
…backend-sidecar
  • Loading branch information
openbob authored Sep 30, 2024
2 parents de3f780 + f52f8ad commit d438a1f
Show file tree
Hide file tree
Showing 56 changed files with 306 additions and 304 deletions.
6 changes: 3 additions & 3 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

// TODO(chaudum): Make configurable via (per-tenant?) setting.
var blockCompressionAlgo = compression.EncNone
var defaultBlockCompressionCodec = compression.None

type Builder struct {
services.Service
Expand Down Expand Up @@ -336,7 +336,7 @@ func (b *Builder) processTask(
return nil, fmt.Errorf("failed to get client: %w", err)
}

blockEnc, err := compression.ParseEncoding(b.limits.BloomBlockEncoding(task.Tenant))
blockEnc, err := compression.ParseCodec(b.limits.BloomBlockEncoding(task.Tenant))
if err != nil {
return nil, fmt.Errorf("failed to parse block encoding: %w", err)
}
Expand Down Expand Up @@ -407,7 +407,7 @@ func (b *Builder) processTask(
blockCt++
blk := newBlocks.At()

built, err := bloomshipper.BlockFrom(blockCompressionAlgo, tenant, task.Table.Addr(), blk)
built, err := bloomshipper.BlockFrom(defaultBlockCompressionCodec, tenant, task.Table.Addr(), blk)
if err != nil {
level.Error(logger).Log("msg", "failed to build block", "err", err)
if err = blk.Reader().Cleanup(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v2.Iterator[*v1.Ser

func TestSimpleBloomGenerator(t *testing.T) {
const maxBlockSize = 100 << 20 // 100MB
for _, enc := range []compression.Encoding{compression.EncNone, compression.EncGZIP, compression.EncSnappy} {
for _, enc := range []compression.Codec{compression.None, compression.GZIP, compression.Snappy} {
for _, tc := range []struct {
desc string
fromSchema, toSchema v1.BlockOptions
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (b *BloomTSDBStore) LoadTSDB(
}
defer data.Close()

decompressorPool := compression.GetReaderPool(compression.EncGZIP)
decompressorPool := compression.GetReaderPool(compression.GZIP)
decompressor, err := decompressorPool.GetReader(data)
if err != nil {
return nil, errors.Wrap(err, "failed to get decompressor")
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := v1.NewByteReader(indexBuf, bloomsBuf)

blockOpts := v1.NewBlockOptions(compression.EncNone, 0, 0)
blockOpts := v1.NewBlockOptions(compression.None, 0, 0)

builder, err := v1.NewBlockBuilder(blockOpts, writer)
if err != nil {
Expand All @@ -202,7 +202,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
block := v1.NewBlock(reader, v1.NewMetrics(nil))

buf := bytes.NewBuffer(nil)
if err := v1.TarCompress(ref.Encoding, buf, block.Reader()); err != nil {
if err := v1.TarCompress(ref.Codec, buf, block.Reader()); err != nil {
return bloomshipper.Block{}, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (c *dumbChunk) Utilization() float64 {
return float64(len(c.entries)) / float64(tmpNumEntries)
}

func (c *dumbChunk) Encoding() compression.Encoding { return compression.EncNone }
func (c *dumbChunk) Encoding() compression.Codec { return compression.None }

// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Chunk interface {
UncompressedSize() int
CompressedSize() int
Close() error
Encoding() compression.Encoding
Encoding() compression.Codec
Rebound(start, end time.Time, filter filter.Func) (Chunk, error)
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ type MemChunk struct {
head HeadBlock

format byte
encoding compression.Encoding
encoding compression.Codec
headFmt HeadBlockFmt

// compressed size of chunk. Set when chunk is cut or while decoding chunk from storage.
Expand Down Expand Up @@ -355,7 +355,7 @@ type entry struct {
}

// NewMemChunk returns a new in-mem chunk.
func NewMemChunk(chunkFormat byte, enc compression.Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
func NewMemChunk(chunkFormat byte, enc compression.Codec, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
return newMemChunkWithFormat(chunkFormat, enc, head, blockSize, targetSize)
}

Expand All @@ -370,7 +370,7 @@ func panicIfInvalidFormat(chunkFmt byte, head HeadBlockFmt) {
}

// NewMemChunk returns a new in-mem chunk.
func newMemChunkWithFormat(format byte, enc compression.Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
func newMemChunkWithFormat(format byte, enc compression.Codec, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
panicIfInvalidFormat(format, head)

symbolizer := newSymbolizer()
Expand Down Expand Up @@ -414,10 +414,10 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
bc.format = version
switch version {
case ChunkFormatV1:
bc.encoding = compression.EncGZIP
bc.encoding = compression.GZIP
case ChunkFormatV2, ChunkFormatV3, ChunkFormatV4:
// format v2+ has a byte for block encoding.
enc := compression.Encoding(db.byte())
enc := compression.Codec(db.byte())
if db.err() != nil {
return nil, errors.Wrap(db.err(), "verifying encoding")
}
Expand Down Expand Up @@ -777,7 +777,7 @@ func MemchunkFromCheckpoint(chk, head []byte, desiredIfNotUnordered HeadBlockFmt
}

// Encoding implements Chunk.
func (c *MemChunk) Encoding() compression.Encoding {
func (c *MemChunk) Encoding() compression.Codec {
return c.encoding
}

Expand Down Expand Up @@ -1173,7 +1173,7 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err
// then allows us to bind a decoding context to a block when requested, but otherwise helps reduce the
// chances of chunk<>block encoding drift in the codebase as the latter is parameterized by the former.
type encBlock struct {
enc compression.Encoding
enc compression.Codec
format byte
symbolizer *symbolizer
block
Expand Down
56 changes: 28 additions & 28 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ import (
"github.com/grafana/loki/v3/pkg/util/filter"
)

var testEncodings = []compression.Encoding{
compression.EncNone,
compression.EncGZIP,
compression.EncLZ4_64k,
compression.EncLZ4_256k,
compression.EncLZ4_1M,
compression.EncLZ4_4M,
compression.EncSnappy,
compression.EncFlate,
compression.EncZstd,
var testEncodings = []compression.Codec{
compression.None,
compression.GZIP,
compression.LZ4_64k,
compression.LZ4_256k,
compression.LZ4_1M,
compression.LZ4_4M,
compression.Snappy,
compression.Flate,
compression.Zstd,
}

var (
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestCorruptChunk(t *testing.T) {
func TestReadFormatV1(t *testing.T) {
t.Parallel()

c := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV3, compression.GZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
fillChunk(c)
// overrides to v1 for testing that specific version.
c.format = ChunkFormatV1
Expand Down Expand Up @@ -391,7 +391,7 @@ func TestRoundtripV2(t *testing.T) {
}
}

func testNameWithFormats(enc compression.Encoding, chunkFormat byte, headBlockFmt HeadBlockFmt) string {
func testNameWithFormats(enc compression.Codec, chunkFormat byte, headBlockFmt HeadBlockFmt) string {
return fmt.Sprintf("encoding:%v chunkFormat:%v headBlockFmt:%v", enc, chunkFormat, headBlockFmt)
}

Expand Down Expand Up @@ -558,7 +558,7 @@ func TestChunkFilling(t *testing.T) {
func TestGZIPChunkTargetSize(t *testing.T) {
t.Parallel()

chk := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
chk := NewMemChunk(ChunkFormatV3, compression.GZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)

lineSize := 512
entry := &logproto.Entry{
Expand Down Expand Up @@ -681,7 +681,7 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

tester(t, NewMemChunk(ChunkFormatV3, compression.EncGZIP, f, testBlockSize, testTargetSize))
tester(t, NewMemChunk(ChunkFormatV3, compression.GZIP, f, testBlockSize, testTargetSize))
})
}
}
Expand Down Expand Up @@ -726,7 +726,7 @@ func TestChunkSize(t *testing.T) {
}

func TestChunkStats(t *testing.T) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, DefaultTestHeadBlockFmt, testBlockSize, 0)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, DefaultTestHeadBlockFmt, testBlockSize, 0)
first := time.Now()
entry := &logproto.Entry{
Timestamp: first,
Expand Down Expand Up @@ -968,7 +968,7 @@ func BenchmarkBackwardIterator(b *testing.B) {
for _, bs := range testBlockSizes {
b.Run(humanize.Bytes(uint64(bs)), func(b *testing.B) {
b.ReportAllocs()
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, DefaultTestHeadBlockFmt, bs, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, DefaultTestHeadBlockFmt, bs, testTargetSize)
_ = fillChunk(c)
b.ResetTimer()
for n := 0; n < b.N; n++ {
Expand Down Expand Up @@ -1082,7 +1082,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) {
func TestMemChunk_IteratorBounds(t *testing.T) {
createChunk := func() *MemChunk {
t.Helper()
c := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, 1e6, 1e6)
c := NewMemChunk(ChunkFormatV3, compression.None, DefaultTestHeadBlockFmt, 1e6, 1e6)

if _, err := c.Append(&logproto.Entry{
Timestamp: time.Unix(0, 1),
Expand Down Expand Up @@ -1168,9 +1168,9 @@ func TestMemchunkLongLine(t *testing.T) {
func TestBytesWith(t *testing.T) {
t.Parallel()

exp, err := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil)
exp, err := NewMemChunk(ChunkFormatV3, compression.None, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil)
require.Nil(t, err)
out, err := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3})
out, err := NewMemChunk(ChunkFormatV3, compression.None, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3})
require.Nil(t, err)

require.Equal(t, exp, out)
Expand All @@ -1181,8 +1181,8 @@ func TestCheckpointEncoding(t *testing.T) {

blockSize, targetSize := 256*1024, 1500*1024
for _, f := range allPossibleFormats {
t.Run(testNameWithFormats(compression.EncSnappy, f.chunkFormat, f.headBlockFmt), func(t *testing.T) {
c := newMemChunkWithFormat(f.chunkFormat, compression.EncSnappy, f.headBlockFmt, blockSize, targetSize)
t.Run(testNameWithFormats(compression.Snappy, f.chunkFormat, f.headBlockFmt), func(t *testing.T) {
c := newMemChunkWithFormat(f.chunkFormat, compression.Snappy, f.headBlockFmt, blockSize, targetSize)

// add a few entries
for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -1267,7 +1267,7 @@ var (
func BenchmarkBufferedIteratorLabels(b *testing.B) {
for _, f := range HeadBlockFmts {
b.Run(f.String(), func(b *testing.B) {
c := NewMemChunk(ChunkFormatV3, compression.EncSnappy, f, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV3, compression.Snappy, f, testBlockSize, testTargetSize)
_ = fillChunk(c)

labelsSet := []labels.Labels{
Expand Down Expand Up @@ -1367,8 +1367,8 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) {

func Test_HeadIteratorReverse(t *testing.T) {
for _, testData := range allPossibleFormats {
t.Run(testNameWithFormats(compression.EncSnappy, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) {
c := newMemChunkWithFormat(testData.chunkFormat, compression.EncSnappy, testData.headBlockFmt, testBlockSize, testTargetSize)
t.Run(testNameWithFormats(compression.Snappy, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) {
c := newMemChunkWithFormat(testData.chunkFormat, compression.Snappy, testData.headBlockFmt, testBlockSize, testTargetSize)
genEntry := func(i int64) *logproto.Entry {
return &logproto.Entry{
Timestamp: time.Unix(0, i),
Expand Down Expand Up @@ -1483,7 +1483,7 @@ func TestMemChunk_Rebound(t *testing.T) {
}

func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk {
chk := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
chk := NewMemChunk(ChunkFormatV3, compression.GZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
for ; from.Before(through); from = from.Add(time.Second) {
_, err := chk.Append(&logproto.Entry{
Line: from.String(),
Expand Down Expand Up @@ -1604,7 +1604,7 @@ func TestMemChunk_ReboundAndFilter_with_filter(t *testing.T) {
}

func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matchingFrom, matchingTo *time.Time, withStructuredMetadata bool) *MemChunk {
chk := NewMemChunk(ChunkFormatV4, compression.EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
chk := NewMemChunk(ChunkFormatV4, compression.GZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
t.Logf("from : %v", from.String())
t.Logf("through: %v", through.String())
var structuredMetadata push.LabelsAdapter
Expand Down Expand Up @@ -1753,7 +1753,7 @@ func TestMemChunk_SpaceFor(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
for _, format := range allPossibleFormats {
t.Run(fmt.Sprintf("chunk_v%d_head_%s", format.chunkFormat, format.headBlockFmt), func(t *testing.T) {
chk := newMemChunkWithFormat(format.chunkFormat, compression.EncNone, format.headBlockFmt, 1024, tc.targetSize)
chk := newMemChunkWithFormat(format.chunkFormat, compression.None, format.headBlockFmt, 1024, tc.targetSize)

chk.blocks = make([]block, tc.nBlocks)
chk.cutBlockSize = tc.cutBlockSize
Expand Down Expand Up @@ -2055,7 +2055,7 @@ func TestDecodeChunkIncorrectBlockOffset(t *testing.T) {
t.Run(fmt.Sprintf("chunkFormat:%v headBlockFmt:%v", format.chunkFormat, format.headBlockFmt), func(t *testing.T) {
for incorrectOffsetBlockNum := 0; incorrectOffsetBlockNum < 3; incorrectOffsetBlockNum++ {
t.Run(fmt.Sprintf("inorrect offset block: %d", incorrectOffsetBlockNum), func(t *testing.T) {
chk := NewMemChunk(format.chunkFormat, compression.EncNone, format.headBlockFmt, blockSize, testTargetSize)
chk := NewMemChunk(format.chunkFormat, compression.None, format.headBlockFmt, blockSize, testTargetSize)
ts := time.Now().Unix()
for i := 0; i < 3; i++ {
dup, err := chk.Append(&logproto.Entry{
Expand Down
16 changes: 8 additions & 8 deletions pkg/chunkenc/unordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func BenchmarkHeadBlockWrites(b *testing.B) {
}

func TestUnorderedChunkIterators(t *testing.T) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for i := 0; i < 100; i++ {
// push in reverse order
dup, err := c.Append(&logproto.Entry{
Expand Down Expand Up @@ -497,11 +497,11 @@ func TestUnorderedChunkIterators(t *testing.T) {
}

func BenchmarkUnorderedRead(b *testing.B) {
legacy := NewMemChunk(ChunkFormatV3, compression.EncSnappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize)
legacy := NewMemChunk(ChunkFormatV3, compression.Snappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkClose(legacy, false)
ordered := NewMemChunk(ChunkFormatV3, compression.EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
ordered := NewMemChunk(ChunkFormatV3, compression.Snappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkClose(ordered, false)
unordered := NewMemChunk(ChunkFormatV3, compression.EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
unordered := NewMemChunk(ChunkFormatV3, compression.Snappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkRandomOrder(unordered, false)

tcs := []struct {
Expand Down Expand Up @@ -559,7 +559,7 @@ func BenchmarkUnorderedRead(b *testing.B) {
}

func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkRandomOrder(c, false)

ct := 0
Expand Down Expand Up @@ -596,7 +596,7 @@ func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
}

func chunkFrom(xs []logproto.Entry) ([]byte, error) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for _, x := range xs {
if _, err := c.Append(&x); err != nil {
return nil, err
Expand Down Expand Up @@ -656,7 +656,7 @@ func TestReorder(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for _, x := range tc.input {
dup, err := c.Append(&x)
require.False(t, dup)
Expand All @@ -675,7 +675,7 @@ func TestReorder(t *testing.T) {
}

func TestReorderAcrossBlocks(t *testing.T) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for _, batch := range [][]int{
// ensure our blocks have overlapping bounds and must be reordered
// before closing.
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func logprotoEntryWithStructuredMetadata(ts int64, line string, structuredMetada
}
}

func generateData(enc compression.Encoding, chunksCount, blockSize, targetSize int) ([]Chunk, uint64) {
func generateData(enc compression.Codec, chunksCount, blockSize, targetSize int) ([]Chunk, uint64) {
chunks := []Chunk{}
i := int64(0)
size := uint64(0)
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/deletion/delete_requests_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (t *deleteRequestsTable) uploadFile() error {
}()

err = t.db.View(func(tx *bbolt.Tx) (err error) {
gzipPool := compression.GetWriterPool(compression.EncGZIP)
gzipPool := compression.GetWriterPool(compression.GZIP)
compressedWriter := gzipPool.GetWriter(f)
defer gzipPool.PutWriter(compressedWriter)

Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (is *indexSet) upload() error {
}
}()

gzipPool := compression.GetWriterPool(compression.EncGZIP)
gzipPool := compression.GetWriterPool(compression.GZIP)
compressedWriter := gzipPool.GetWriter(f)
defer gzipPool.PutWriter(compressedWriter)

Expand Down
Loading

0 comments on commit d438a1f

Please sign in to comment.