From f52f8ad540637466858fd4a2139b62c38721083d Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 30 Sep 2024 08:55:51 +0200 Subject: [PATCH] chore: Clarify compression package (#14257) This PR renames "encoding" to "codec" in the compression package to remove the cognitive dissonance. It also removes the `Enc` prefix for codec identifiers, so that they adhere Go's best practice of naming conventions, e.g. `compression.EncGZIP` becomes `compression.GZIP` when used in a different package. Signed-off-by: Christian Haudum --- pkg/bloombuild/builder/builder.go | 6 +- pkg/bloombuild/builder/spec_test.go | 2 +- pkg/bloombuild/common/tsdb.go | 2 +- pkg/bloombuild/planner/planner_test.go | 4 +- pkg/chunkenc/dumb_chunk.go | 2 +- pkg/chunkenc/interface.go | 2 +- pkg/chunkenc/memchunk.go | 14 +-- pkg/chunkenc/memchunk_test.go | 56 ++++++------ pkg/chunkenc/unordered_test.go | 16 ++-- pkg/chunkenc/util_test.go | 2 +- .../deletion/delete_requests_table.go | 2 +- pkg/compactor/index_set.go | 2 +- pkg/compactor/retention/retention_test.go | 2 +- pkg/compression/codec.go | 85 +++++++++++++++++++ .../{encoding_test.go => codec_test.go} | 6 +- pkg/compression/encoding.go | 83 ------------------ pkg/compression/fileext.go | 30 +++---- pkg/compression/pool.go | 24 +++--- pkg/compression/pool_test.go | 2 +- pkg/ingester/checkpoint_test.go | 2 +- pkg/ingester/chunk_test.go | 2 +- pkg/ingester/encoding_test.go | 4 +- pkg/ingester/flush_test.go | 2 +- pkg/ingester/ingester.go | 28 +++--- pkg/ingester/ingester_test.go | 18 ++-- pkg/ingester/stream_test.go | 2 +- pkg/storage/bloom/v1/archive.go | 4 +- pkg/storage/bloom/v1/archive_test.go | 24 +++--- pkg/storage/bloom/v1/bloom.go | 2 +- pkg/storage/bloom/v1/bloom_tokenizer_test.go | 8 +- pkg/storage/bloom/v1/builder.go | 2 +- pkg/storage/bloom/v1/builder_test.go | 22 ++--- pkg/storage/bloom/v1/fuse_test.go | 10 +-- pkg/storage/bloom/v1/schema.go | 8 +- pkg/storage/bloom/v1/test_util.go | 2 +- .../bloom/v1/versioned_builder_test.go | 4 +- pkg/storage/chunk/cache/cache_test.go | 2 +- .../chunk/client/grpc/grpc_client_test.go | 2 +- .../chunk/client/testutils/testutils.go | 2 +- pkg/storage/chunk/fetcher/fetcher_test.go | 2 +- pkg/storage/hack/main.go | 4 +- pkg/storage/store_test.go | 2 +- .../stores/series/series_store_test.go | 2 +- pkg/storage/stores/series_store_write_test.go | 2 +- .../stores/shipper/bloomshipper/client.go | 12 +-- .../shipper/bloomshipper/client_test.go | 36 ++++---- .../shipper/bloomshipper/fetcher_test.go | 6 +- .../stores/shipper/bloomshipper/resolver.go | 10 +-- .../shipper/bloomshipper/resolver_test.go | 26 +++--- .../stores/shipper/bloomshipper/store_test.go | 4 +- .../indexshipper/boltdb/compactor/util.go | 2 +- .../shipper/indexshipper/uploads/index_set.go | 2 +- pkg/storage/util_test.go | 2 +- pkg/validation/limits.go | 2 +- pkg/validation/limits_test.go | 2 +- tools/tsdb/migrate-versions/main.go | 2 +- 56 files changed, 306 insertions(+), 304 deletions(-) create mode 100644 pkg/compression/codec.go rename pkg/compression/{encoding_test.go => codec_test.go} (84%) delete mode 100644 pkg/compression/encoding.go diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index fdeab9cf92c7..63950d7eadcb 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -34,7 +34,7 @@ import ( ) // TODO(chaudum): Make configurable via (per-tenant?) setting. -var blockCompressionAlgo = compression.EncNone +var defaultBlockCompressionCodec = compression.None type Builder struct { services.Service @@ -336,7 +336,7 @@ func (b *Builder) processTask( return nil, fmt.Errorf("failed to get client: %w", err) } - blockEnc, err := compression.ParseEncoding(b.limits.BloomBlockEncoding(task.Tenant)) + blockEnc, err := compression.ParseCodec(b.limits.BloomBlockEncoding(task.Tenant)) if err != nil { return nil, fmt.Errorf("failed to parse block encoding: %w", err) } @@ -407,7 +407,7 @@ func (b *Builder) processTask( blockCt++ blk := newBlocks.At() - built, err := bloomshipper.BlockFrom(blockCompressionAlgo, tenant, task.Table.Addr(), blk) + built, err := bloomshipper.BlockFrom(defaultBlockCompressionCodec, tenant, task.Table.Addr(), blk) if err != nil { level.Error(logger).Log("msg", "failed to build block", "err", err) if err = blk.Reader().Cleanup(); err != nil { diff --git a/pkg/bloombuild/builder/spec_test.go b/pkg/bloombuild/builder/spec_test.go index 330c0552b657..23afa5875474 100644 --- a/pkg/bloombuild/builder/spec_test.go +++ b/pkg/bloombuild/builder/spec_test.go @@ -115,7 +115,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v2.Iterator[*v1.Ser func TestSimpleBloomGenerator(t *testing.T) { const maxBlockSize = 100 << 20 // 100MB - for _, enc := range []compression.Encoding{compression.EncNone, compression.EncGZIP, compression.EncSnappy} { + for _, enc := range []compression.Codec{compression.None, compression.GZIP, compression.Snappy} { for _, tc := range []struct { desc string fromSchema, toSchema v1.BlockOptions diff --git a/pkg/bloombuild/common/tsdb.go b/pkg/bloombuild/common/tsdb.go index ea31767cca0b..e45ff4b153c7 100644 --- a/pkg/bloombuild/common/tsdb.go +++ b/pkg/bloombuild/common/tsdb.go @@ -102,7 +102,7 @@ func (b *BloomTSDBStore) LoadTSDB( } defer data.Close() - decompressorPool := compression.GetReaderPool(compression.EncGZIP) + decompressorPool := compression.GetReaderPool(compression.GZIP) decompressor, err := decompressorPool.GetReader(data) if err != nil { return nil, errors.Wrap(err, "failed to get decompressor") diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index ea780c98e8ee..9523a4579557 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -188,7 +188,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) { writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf) reader := v1.NewByteReader(indexBuf, bloomsBuf) - blockOpts := v1.NewBlockOptions(compression.EncNone, 0, 0) + blockOpts := v1.NewBlockOptions(compression.None, 0, 0) builder, err := v1.NewBlockBuilder(blockOpts, writer) if err != nil { @@ -202,7 +202,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) { block := v1.NewBlock(reader, v1.NewMetrics(nil)) buf := bytes.NewBuffer(nil) - if err := v1.TarCompress(ref.Encoding, buf, block.Reader()); err != nil { + if err := v1.TarCompress(ref.Codec, buf, block.Reader()); err != nil { return bloomshipper.Block{}, err } diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index e28298605118..b95f92c2fdfd 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -70,7 +70,7 @@ func (c *dumbChunk) Utilization() float64 { return float64(len(c.entries)) / float64(tmpNumEntries) } -func (c *dumbChunk) Encoding() compression.Encoding { return compression.EncNone } +func (c *dumbChunk) Encoding() compression.Codec { return compression.None } // Returns an iterator that goes from _most_ recent to _least_ recent (ie, // backwards). diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 057fc8b985ad..e894b687236c 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -68,7 +68,7 @@ type Chunk interface { UncompressedSize() int CompressedSize() int Close() error - Encoding() compression.Encoding + Encoding() compression.Codec Rebound(start, end time.Time, filter filter.Func) (Chunk, error) } diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 03f33b817672..790210d3af8b 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -132,7 +132,7 @@ type MemChunk struct { head HeadBlock format byte - encoding compression.Encoding + encoding compression.Codec headFmt HeadBlockFmt // compressed size of chunk. Set when chunk is cut or while decoding chunk from storage. @@ -355,7 +355,7 @@ type entry struct { } // NewMemChunk returns a new in-mem chunk. -func NewMemChunk(chunkFormat byte, enc compression.Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk { +func NewMemChunk(chunkFormat byte, enc compression.Codec, head HeadBlockFmt, blockSize, targetSize int) *MemChunk { return newMemChunkWithFormat(chunkFormat, enc, head, blockSize, targetSize) } @@ -370,7 +370,7 @@ func panicIfInvalidFormat(chunkFmt byte, head HeadBlockFmt) { } // NewMemChunk returns a new in-mem chunk. -func newMemChunkWithFormat(format byte, enc compression.Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk { +func newMemChunkWithFormat(format byte, enc compression.Codec, head HeadBlockFmt, blockSize, targetSize int) *MemChunk { panicIfInvalidFormat(format, head) symbolizer := newSymbolizer() @@ -414,10 +414,10 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me bc.format = version switch version { case ChunkFormatV1: - bc.encoding = compression.EncGZIP + bc.encoding = compression.GZIP case ChunkFormatV2, ChunkFormatV3, ChunkFormatV4: // format v2+ has a byte for block encoding. - enc := compression.Encoding(db.byte()) + enc := compression.Codec(db.byte()) if db.err() != nil { return nil, errors.Wrap(db.err(), "verifying encoding") } @@ -777,7 +777,7 @@ func MemchunkFromCheckpoint(chk, head []byte, desiredIfNotUnordered HeadBlockFmt } // Encoding implements Chunk. -func (c *MemChunk) Encoding() compression.Encoding { +func (c *MemChunk) Encoding() compression.Codec { return c.encoding } @@ -1173,7 +1173,7 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err // then allows us to bind a decoding context to a block when requested, but otherwise helps reduce the // chances of chunk<>block encoding drift in the codebase as the latter is parameterized by the former. type encBlock struct { - enc compression.Encoding + enc compression.Codec format byte symbolizer *symbolizer block diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 987a5d88b286..24d4ab2d2c2c 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -32,16 +32,16 @@ import ( "github.com/grafana/loki/v3/pkg/util/filter" ) -var testEncodings = []compression.Encoding{ - compression.EncNone, - compression.EncGZIP, - compression.EncLZ4_64k, - compression.EncLZ4_256k, - compression.EncLZ4_1M, - compression.EncLZ4_4M, - compression.EncSnappy, - compression.EncFlate, - compression.EncZstd, +var testEncodings = []compression.Codec{ + compression.None, + compression.GZIP, + compression.LZ4_64k, + compression.LZ4_256k, + compression.LZ4_1M, + compression.LZ4_4M, + compression.Snappy, + compression.Flate, + compression.Zstd, } var ( @@ -299,7 +299,7 @@ func TestCorruptChunk(t *testing.T) { func TestReadFormatV1(t *testing.T) { t.Parallel() - c := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(ChunkFormatV3, compression.GZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) fillChunk(c) // overrides to v1 for testing that specific version. c.format = ChunkFormatV1 @@ -391,7 +391,7 @@ func TestRoundtripV2(t *testing.T) { } } -func testNameWithFormats(enc compression.Encoding, chunkFormat byte, headBlockFmt HeadBlockFmt) string { +func testNameWithFormats(enc compression.Codec, chunkFormat byte, headBlockFmt HeadBlockFmt) string { return fmt.Sprintf("encoding:%v chunkFormat:%v headBlockFmt:%v", enc, chunkFormat, headBlockFmt) } @@ -558,7 +558,7 @@ func TestChunkFilling(t *testing.T) { func TestGZIPChunkTargetSize(t *testing.T) { t.Parallel() - chk := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) + chk := NewMemChunk(ChunkFormatV3, compression.GZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) lineSize := 512 entry := &logproto.Entry{ @@ -681,7 +681,7 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - tester(t, NewMemChunk(ChunkFormatV3, compression.EncGZIP, f, testBlockSize, testTargetSize)) + tester(t, NewMemChunk(ChunkFormatV3, compression.GZIP, f, testBlockSize, testTargetSize)) }) } } @@ -726,7 +726,7 @@ func TestChunkSize(t *testing.T) { } func TestChunkStats(t *testing.T) { - c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, DefaultTestHeadBlockFmt, testBlockSize, 0) + c := NewMemChunk(ChunkFormatV4, compression.Snappy, DefaultTestHeadBlockFmt, testBlockSize, 0) first := time.Now() entry := &logproto.Entry{ Timestamp: first, @@ -968,7 +968,7 @@ func BenchmarkBackwardIterator(b *testing.B) { for _, bs := range testBlockSizes { b.Run(humanize.Bytes(uint64(bs)), func(b *testing.B) { b.ReportAllocs() - c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, DefaultTestHeadBlockFmt, bs, testTargetSize) + c := NewMemChunk(ChunkFormatV4, compression.Snappy, DefaultTestHeadBlockFmt, bs, testTargetSize) _ = fillChunk(c) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -1082,7 +1082,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) { func TestMemChunk_IteratorBounds(t *testing.T) { createChunk := func() *MemChunk { t.Helper() - c := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, 1e6, 1e6) + c := NewMemChunk(ChunkFormatV3, compression.None, DefaultTestHeadBlockFmt, 1e6, 1e6) if _, err := c.Append(&logproto.Entry{ Timestamp: time.Unix(0, 1), @@ -1168,9 +1168,9 @@ func TestMemchunkLongLine(t *testing.T) { func TestBytesWith(t *testing.T) { t.Parallel() - exp, err := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil) + exp, err := NewMemChunk(ChunkFormatV3, compression.None, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil) require.Nil(t, err) - out, err := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3}) + out, err := NewMemChunk(ChunkFormatV3, compression.None, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3}) require.Nil(t, err) require.Equal(t, exp, out) @@ -1181,8 +1181,8 @@ func TestCheckpointEncoding(t *testing.T) { blockSize, targetSize := 256*1024, 1500*1024 for _, f := range allPossibleFormats { - t.Run(testNameWithFormats(compression.EncSnappy, f.chunkFormat, f.headBlockFmt), func(t *testing.T) { - c := newMemChunkWithFormat(f.chunkFormat, compression.EncSnappy, f.headBlockFmt, blockSize, targetSize) + t.Run(testNameWithFormats(compression.Snappy, f.chunkFormat, f.headBlockFmt), func(t *testing.T) { + c := newMemChunkWithFormat(f.chunkFormat, compression.Snappy, f.headBlockFmt, blockSize, targetSize) // add a few entries for i := 0; i < 5; i++ { @@ -1267,7 +1267,7 @@ var ( func BenchmarkBufferedIteratorLabels(b *testing.B) { for _, f := range HeadBlockFmts { b.Run(f.String(), func(b *testing.B) { - c := NewMemChunk(ChunkFormatV3, compression.EncSnappy, f, testBlockSize, testTargetSize) + c := NewMemChunk(ChunkFormatV3, compression.Snappy, f, testBlockSize, testTargetSize) _ = fillChunk(c) labelsSet := []labels.Labels{ @@ -1367,8 +1367,8 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) { func Test_HeadIteratorReverse(t *testing.T) { for _, testData := range allPossibleFormats { - t.Run(testNameWithFormats(compression.EncSnappy, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) { - c := newMemChunkWithFormat(testData.chunkFormat, compression.EncSnappy, testData.headBlockFmt, testBlockSize, testTargetSize) + t.Run(testNameWithFormats(compression.Snappy, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) { + c := newMemChunkWithFormat(testData.chunkFormat, compression.Snappy, testData.headBlockFmt, testBlockSize, testTargetSize) genEntry := func(i int64) *logproto.Entry { return &logproto.Entry{ Timestamp: time.Unix(0, i), @@ -1483,7 +1483,7 @@ func TestMemChunk_Rebound(t *testing.T) { } func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk { - chk := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0) + chk := NewMemChunk(ChunkFormatV3, compression.GZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0) for ; from.Before(through); from = from.Add(time.Second) { _, err := chk.Append(&logproto.Entry{ Line: from.String(), @@ -1604,7 +1604,7 @@ func TestMemChunk_ReboundAndFilter_with_filter(t *testing.T) { } func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matchingFrom, matchingTo *time.Time, withStructuredMetadata bool) *MemChunk { - chk := NewMemChunk(ChunkFormatV4, compression.EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0) + chk := NewMemChunk(ChunkFormatV4, compression.GZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0) t.Logf("from : %v", from.String()) t.Logf("through: %v", through.String()) var structuredMetadata push.LabelsAdapter @@ -1753,7 +1753,7 @@ func TestMemChunk_SpaceFor(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { for _, format := range allPossibleFormats { t.Run(fmt.Sprintf("chunk_v%d_head_%s", format.chunkFormat, format.headBlockFmt), func(t *testing.T) { - chk := newMemChunkWithFormat(format.chunkFormat, compression.EncNone, format.headBlockFmt, 1024, tc.targetSize) + chk := newMemChunkWithFormat(format.chunkFormat, compression.None, format.headBlockFmt, 1024, tc.targetSize) chk.blocks = make([]block, tc.nBlocks) chk.cutBlockSize = tc.cutBlockSize @@ -2055,7 +2055,7 @@ func TestDecodeChunkIncorrectBlockOffset(t *testing.T) { t.Run(fmt.Sprintf("chunkFormat:%v headBlockFmt:%v", format.chunkFormat, format.headBlockFmt), func(t *testing.T) { for incorrectOffsetBlockNum := 0; incorrectOffsetBlockNum < 3; incorrectOffsetBlockNum++ { t.Run(fmt.Sprintf("inorrect offset block: %d", incorrectOffsetBlockNum), func(t *testing.T) { - chk := NewMemChunk(format.chunkFormat, compression.EncNone, format.headBlockFmt, blockSize, testTargetSize) + chk := NewMemChunk(format.chunkFormat, compression.None, format.headBlockFmt, blockSize, testTargetSize) ts := time.Now().Unix() for i := 0; i < 3; i++ { dup, err := chk.Append(&logproto.Entry{ diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index fb341aaa8db9..509a34673fda 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -451,7 +451,7 @@ func BenchmarkHeadBlockWrites(b *testing.B) { } func TestUnorderedChunkIterators(t *testing.T) { - c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) for i := 0; i < 100; i++ { // push in reverse order dup, err := c.Append(&logproto.Entry{ @@ -497,11 +497,11 @@ func TestUnorderedChunkIterators(t *testing.T) { } func BenchmarkUnorderedRead(b *testing.B) { - legacy := NewMemChunk(ChunkFormatV3, compression.EncSnappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize) + legacy := NewMemChunk(ChunkFormatV3, compression.Snappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize) fillChunkClose(legacy, false) - ordered := NewMemChunk(ChunkFormatV3, compression.EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) + ordered := NewMemChunk(ChunkFormatV3, compression.Snappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) fillChunkClose(ordered, false) - unordered := NewMemChunk(ChunkFormatV3, compression.EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) + unordered := NewMemChunk(ChunkFormatV3, compression.Snappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) fillChunkRandomOrder(unordered, false) tcs := []struct { @@ -559,7 +559,7 @@ func BenchmarkUnorderedRead(b *testing.B) { } func TestUnorderedIteratorCountsAllEntries(t *testing.T) { - c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) fillChunkRandomOrder(c, false) ct := 0 @@ -596,7 +596,7 @@ func TestUnorderedIteratorCountsAllEntries(t *testing.T) { } func chunkFrom(xs []logproto.Entry) ([]byte, error) { - c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) for _, x := range xs { if _, err := c.Append(&x); err != nil { return nil, err @@ -656,7 +656,7 @@ func TestReorder(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) for _, x := range tc.input { dup, err := c.Append(&x) require.False(t, dup) @@ -675,7 +675,7 @@ func TestReorder(t *testing.T) { } func TestReorderAcrossBlocks(t *testing.T) { - c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) for _, batch := range [][]int{ // ensure our blocks have overlapping bounds and must be reordered // before closing. diff --git a/pkg/chunkenc/util_test.go b/pkg/chunkenc/util_test.go index 0d75273d6c81..bcbe9cc1e8be 100644 --- a/pkg/chunkenc/util_test.go +++ b/pkg/chunkenc/util_test.go @@ -24,7 +24,7 @@ func logprotoEntryWithStructuredMetadata(ts int64, line string, structuredMetada } } -func generateData(enc compression.Encoding, chunksCount, blockSize, targetSize int) ([]Chunk, uint64) { +func generateData(enc compression.Codec, chunksCount, blockSize, targetSize int) ([]Chunk, uint64) { chunks := []Chunk{} i := int64(0) size := uint64(0) diff --git a/pkg/compactor/deletion/delete_requests_table.go b/pkg/compactor/deletion/delete_requests_table.go index 7d4c5cf4d254..ed748097e5ba 100644 --- a/pkg/compactor/deletion/delete_requests_table.go +++ b/pkg/compactor/deletion/delete_requests_table.go @@ -117,7 +117,7 @@ func (t *deleteRequestsTable) uploadFile() error { }() err = t.db.View(func(tx *bbolt.Tx) (err error) { - gzipPool := compression.GetWriterPool(compression.EncGZIP) + gzipPool := compression.GetWriterPool(compression.GZIP) compressedWriter := gzipPool.GetWriter(f) defer gzipPool.PutWriter(compressedWriter) diff --git a/pkg/compactor/index_set.go b/pkg/compactor/index_set.go index 76b5546a9628..481d6aa39937 100644 --- a/pkg/compactor/index_set.go +++ b/pkg/compactor/index_set.go @@ -229,7 +229,7 @@ func (is *indexSet) upload() error { } }() - gzipPool := compression.GetWriterPool(compression.EncGZIP) + gzipPool := compression.GetWriterPool(compression.GZIP) compressedWriter := gzipPool.GetWriter(f) defer gzipPool.PutWriter(compressedWriter) diff --git a/pkg/compactor/retention/retention_test.go b/pkg/compactor/retention/retention_test.go index 32dac3293a09..b68d9e39f42c 100644 --- a/pkg/compactor/retention/retention_test.go +++ b/pkg/compactor/retention/retention_test.go @@ -279,7 +279,7 @@ func createChunk(t testing.TB, userID string, lbs labels.Labels, from model.Time labelsBuilder.Set(labels.MetricName, "logs") metric := labelsBuilder.Labels() fp := ingesterclient.Fingerprint(lbs) - chunkEnc := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, blockSize, targetSize) + chunkEnc := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, blockSize, targetSize) for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) { dup, err := chunkEnc.Append(&logproto.Entry{ diff --git a/pkg/compression/codec.go b/pkg/compression/codec.go new file mode 100644 index 000000000000..84038e50193a --- /dev/null +++ b/pkg/compression/codec.go @@ -0,0 +1,85 @@ +package compression + +import ( + "fmt" + "strings" +) + +// Codec identifies an available compression codec. +type Codec byte + +// The different available codecs +// Make sure to preserve the order, as the numeric values are serialized! +// +//nolint:revive +const ( + None Codec = iota + GZIP + Dumb // not supported + LZ4_64k + Snappy + LZ4_256k + LZ4_1M + LZ4_4M + Flate + Zstd +) + +var supportedCodecs = []Codec{ + None, + GZIP, + LZ4_64k, + Snappy, + LZ4_256k, + LZ4_1M, + LZ4_4M, + Flate, + Zstd, +} + +func (e Codec) String() string { + switch e { + case GZIP: + return "gzip" + case None: + return "none" + case LZ4_64k: + return "lz4-64k" + case LZ4_256k: + return "lz4-256k" + case LZ4_1M: + return "lz4-1M" + case LZ4_4M: + return "lz4" + case Snappy: + return "snappy" + case Flate: + return "flate" + case Zstd: + return "zstd" + default: + return "unknown" + } +} + +// ParseCodec parses a chunk encoding (compression codec) by its name. +func ParseCodec(enc string) (Codec, error) { + for _, e := range supportedCodecs { + if strings.EqualFold(e.String(), enc) { + return e, nil + } + } + return 0, fmt.Errorf("invalid encoding: %s, supported: %s", enc, SupportedCodecs()) +} + +// SupportedCodecs returns the list of supported Encoding. +func SupportedCodecs() string { + var sb strings.Builder + for i := range supportedCodecs { + sb.WriteString(supportedCodecs[i].String()) + if i != len(supportedCodecs)-1 { + sb.WriteString(", ") + } + } + return sb.String() +} diff --git a/pkg/compression/encoding_test.go b/pkg/compression/codec_test.go similarity index 84% rename from pkg/compression/encoding_test.go rename to pkg/compression/codec_test.go index d67323ebb2d4..7d25b53380d6 100644 --- a/pkg/compression/encoding_test.go +++ b/pkg/compression/codec_test.go @@ -5,15 +5,15 @@ import "testing" func TestParseEncoding(t *testing.T) { tests := []struct { enc string - want Encoding + want Codec wantErr bool }{ - {"gzip", EncGZIP, false}, + {"gzip", GZIP, false}, {"bad", 0, true}, } for _, tt := range tests { t.Run(tt.enc, func(t *testing.T) { - got, err := ParseEncoding(tt.enc) + got, err := ParseCodec(tt.enc) if (err != nil) != tt.wantErr { t.Errorf("ParseEncoding() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/compression/encoding.go b/pkg/compression/encoding.go deleted file mode 100644 index ecef31f09325..000000000000 --- a/pkg/compression/encoding.go +++ /dev/null @@ -1,83 +0,0 @@ -package compression - -import ( - "fmt" - "strings" -) - -// Encoding identifies an available compression type. -type Encoding byte - -// The different available encodings. -// Make sure to preserve the order, as the numeric values are serialized! -const ( - EncNone Encoding = iota - EncGZIP - EncDumb // not supported - EncLZ4_64k - EncSnappy - EncLZ4_256k - EncLZ4_1M - EncLZ4_4M - EncFlate - EncZstd -) - -var supportedEncoding = []Encoding{ - EncNone, - EncGZIP, - EncLZ4_64k, - EncSnappy, - EncLZ4_256k, - EncLZ4_1M, - EncLZ4_4M, - EncFlate, - EncZstd, -} - -func (e Encoding) String() string { - switch e { - case EncGZIP: - return "gzip" - case EncNone: - return "none" - case EncLZ4_64k: - return "lz4-64k" - case EncLZ4_256k: - return "lz4-256k" - case EncLZ4_1M: - return "lz4-1M" - case EncLZ4_4M: - return "lz4" - case EncSnappy: - return "snappy" - case EncFlate: - return "flate" - case EncZstd: - return "zstd" - default: - return "unknown" - } -} - -// ParseEncoding parses an chunk encoding (compression algorithm) by its name. -func ParseEncoding(enc string) (Encoding, error) { - for _, e := range supportedEncoding { - if strings.EqualFold(e.String(), enc) { - return e, nil - } - } - return 0, fmt.Errorf("invalid encoding: %s, supported: %s", enc, SupportedEncoding()) -} - -// SupportedEncoding returns the list of supported Encoding. -func SupportedEncoding() string { - var sb strings.Builder - for i := range supportedEncoding { - sb.WriteString(supportedEncoding[i].String()) - if i != len(supportedEncoding)-1 { - sb.WriteString(", ") - } - } - return sb.String() -} diff --git a/pkg/compression/fileext.go b/pkg/compression/fileext.go index 8cd09c392d08..d6cfa4431b72 100644 --- a/pkg/compression/fileext.go +++ b/pkg/compression/fileext.go @@ -11,39 +11,39 @@ const ( ExtZstd = ".zst" ) -func ToFileExtension(e Encoding) string { +func ToFileExtension(e Codec) string { switch e { - case EncNone: + case None: return ExtNone - case EncGZIP: + case GZIP: return ExtGZIP - case EncLZ4_64k, EncLZ4_256k, EncLZ4_1M, EncLZ4_4M: + case LZ4_64k, LZ4_256k, LZ4_1M, LZ4_4M: return ExtLZ4 - case EncSnappy: + case Snappy: return ExtSnappy - case EncFlate: + case Flate: return ExtFlate - case EncZstd: + case Zstd: return ExtZstd default: - panic(fmt.Sprintf("invalid encoding: %d, supported: %s", e, SupportedEncoding())) + panic(fmt.Sprintf("invalid codec: %d, supported: %s", e, SupportedCodecs())) } } -func FromFileExtension(ext string) Encoding { +func FromFileExtension(ext string) Codec { switch ext { case ExtNone: - return EncNone + return None case ExtGZIP: - return EncGZIP + return GZIP case ExtLZ4: - return EncLZ4_4M + return LZ4_4M case ExtSnappy: - return EncSnappy + return Snappy case ExtFlate: - return EncFlate + return Flate case ExtZstd: - return EncZstd + return Zstd default: panic(fmt.Sprintf("invalid file extension: %s", ext)) } diff --git a/pkg/compression/pool.go b/pkg/compression/pool.go index b68ff7de47b1..564287591651 100644 --- a/pkg/compression/pool.go +++ b/pkg/compression/pool.go @@ -51,33 +51,33 @@ var ( noop = NoopPool{} ) -func GetWriterPool(enc Encoding) WriterPool { +func GetWriterPool(enc Codec) WriterPool { return GetPool(enc).(WriterPool) } -func GetReaderPool(enc Encoding) ReaderPool { +func GetReaderPool(enc Codec) ReaderPool { return GetPool(enc).(ReaderPool) } -func GetPool(enc Encoding) ReaderWriterPool { +func GetPool(enc Codec) ReaderWriterPool { switch enc { - case EncGZIP: + case GZIP: return &gzip - case EncLZ4_64k: + case LZ4_64k: return &lz4_64k - case EncLZ4_256k: + case LZ4_256k: return &lz4_256k - case EncLZ4_1M: + case LZ4_1M: return &lz4_1M - case EncLZ4_4M: + case LZ4_4M: return &lz4_4M - case EncSnappy: + case Snappy: return &snappy - case EncNone: + case None: return &noop - case EncFlate: + case Flate: return &flate - case EncZstd: + case Zstd: return &zstd default: panic("unknown encoding") diff --git a/pkg/compression/pool_test.go b/pkg/compression/pool_test.go index b39bbe0ad6f4..fc5ba08a0d48 100644 --- a/pkg/compression/pool_test.go +++ b/pkg/compression/pool_test.go @@ -15,7 +15,7 @@ import ( ) func TestPool(t *testing.T) { - for _, enc := range supportedEncoding { + for _, enc := range supportedCodecs { enc := enc t.Run(enc.String(), func(t *testing.T) { var wg sync.WaitGroup diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 1639125390a0..9f1db601bb72 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -566,7 +566,7 @@ func buildChunks(t testing.TB, size int) []Chunk { for i := 0; i < size; i++ { // build chunks of 256k blocks, 1.5MB target size. Same as default config. - c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV3, compression.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 1500*1024) + c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV3, compression.GZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 1500*1024) fillChunk(t, c) descs = append(descs, chunkDesc{ chunk: c, diff --git a/pkg/ingester/chunk_test.go b/pkg/ingester/chunk_test.go index 961b256ea58c..6d1f1735469f 100644 --- a/pkg/ingester/chunk_test.go +++ b/pkg/ingester/chunk_test.go @@ -50,7 +50,7 @@ func TestIterator(t *testing.T) { }{ {"dumbChunk", chunkenc.NewDumbChunk}, {"gzipChunk", func() chunkenc.Chunk { - return chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) + return chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.GZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) }}, } { t.Run(chk.name, func(t *testing.T) { diff --git a/pkg/ingester/encoding_test.go b/pkg/ingester/encoding_test.go index ee2ad1d8f681..3a730324e5c3 100644 --- a/pkg/ingester/encoding_test.go +++ b/pkg/ingester/encoding_test.go @@ -59,7 +59,7 @@ func Test_EncodingChunks(t *testing.T) { t.Run(fmt.Sprintf("%v-%s", close, tc.desc), func(t *testing.T) { conf := tc.conf - c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize) + c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.GZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize) fillChunk(t, c) if close { require.Nil(t, c.Close()) @@ -122,7 +122,7 @@ func Test_EncodingChunks(t *testing.T) { func Test_EncodingCheckpoint(t *testing.T) { conf := dummyConf() - c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize) + c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.GZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize) dup, err := c.Append(&logproto.Entry{ Timestamp: time.Unix(1, 0), Line: "hi there", diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index f01fb02e8730..f4251747115a 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -189,7 +189,7 @@ func buildChunkDecs(t testing.TB) []*chunkDesc { for i := range res { res[i] = &chunkDesc{ closed: true, - chunk: chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize), + chunk: chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize), } fillChunk(t, res[i].chunk) require.NoError(t, res[i].chunk.Close()) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7776b9097f08..529336a58561 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -89,18 +89,18 @@ var ( type Config struct { LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the ingester will operate and where it will register for discovery."` - ConcurrentFlushes int `yaml:"concurrent_flushes"` - FlushCheckPeriod time.Duration `yaml:"flush_check_period"` - FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"` - FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` - RetainPeriod time.Duration `yaml:"chunk_retain_period"` - MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` - BlockSize int `yaml:"chunk_block_size"` - TargetChunkSize int `yaml:"chunk_target_size"` - ChunkEncoding string `yaml:"chunk_encoding"` - parsedEncoding compression.Encoding `yaml:"-"` // placeholder for validated encoding - MaxChunkAge time.Duration `yaml:"max_chunk_age"` - AutoForgetUnhealthy bool `yaml:"autoforget_unhealthy"` + ConcurrentFlushes int `yaml:"concurrent_flushes"` + FlushCheckPeriod time.Duration `yaml:"flush_check_period"` + FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"` + FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` + RetainPeriod time.Duration `yaml:"chunk_retain_period"` + MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` + BlockSize int `yaml:"chunk_block_size"` + TargetChunkSize int `yaml:"chunk_target_size"` + ChunkEncoding string `yaml:"chunk_encoding"` + parsedEncoding compression.Codec `yaml:"-"` // placeholder for validated encoding + MaxChunkAge time.Duration `yaml:"max_chunk_age"` + AutoForgetUnhealthy bool `yaml:"autoforget_unhealthy"` // Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments. SyncPeriod time.Duration `yaml:"sync_period"` @@ -150,7 +150,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.") f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.") f.IntVar(&cfg.TargetChunkSize, "ingester.chunk-target-size", 1572864, "A target _compressed_ size in bytes for chunks. This is a desired size not an exact size, chunks may be slightly bigger or significantly smaller if they get flushed for other reasons (e.g. chunk_idle_period). A value of 0 creates chunks with a fixed 10 blocks, a non zero value will create chunks with a variable number of blocks to meet the target size.") // 1.5 MB - f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", compression.EncGZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", compression.SupportedEncoding())) + f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", compression.GZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", compression.SupportedCodecs())) f.DurationVar(&cfg.SyncPeriod, "ingester.sync-period", 1*time.Hour, "Parameters used to synchronize ingesters to cut chunks at the same moment. Sync period is used to roll over incoming entry to a new chunk. If chunk's utilization isn't high enough (eg. less than 50% when sync_min_utilization is set to 0.5), then this chunk rollover doesn't happen.") f.Float64Var(&cfg.SyncMinUtilization, "ingester.sync-min-utilization", 0.1, "Minimum utilization of chunk when doing synchronization.") f.IntVar(&cfg.MaxReturnedErrors, "ingester.max-ignored-stream-errors", 10, "The maximum number of errors a stream will report to the user when a push fails. 0 to make unlimited.") @@ -164,7 +164,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { } func (cfg *Config) Validate() error { - enc, err := compression.ParseEncoding(cfg.ChunkEncoding) + enc, err := compression.ParseCodec(cfg.ChunkEncoding) if err != nil { return err } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index a9108c52c2a1..9074580b4eb4 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -697,7 +697,7 @@ func TestValidate(t *testing.T) { }{ { in: Config{ - ChunkEncoding: compression.EncGZIP.String(), + ChunkEncoding: compression.GZIP.String(), FlushOpBackoff: backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 10 * time.Second, @@ -708,7 +708,7 @@ func TestValidate(t *testing.T) { MaxChunkAge: time.Minute, }, expected: Config{ - ChunkEncoding: compression.EncGZIP.String(), + ChunkEncoding: compression.GZIP.String(), FlushOpBackoff: backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 10 * time.Second, @@ -717,12 +717,12 @@ func TestValidate(t *testing.T) { FlushOpTimeout: 15 * time.Second, IndexShards: index.DefaultIndexShards, MaxChunkAge: time.Minute, - parsedEncoding: compression.EncGZIP, + parsedEncoding: compression.GZIP, }, }, { in: Config{ - ChunkEncoding: compression.EncSnappy.String(), + ChunkEncoding: compression.Snappy.String(), FlushOpBackoff: backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 10 * time.Second, @@ -732,7 +732,7 @@ func TestValidate(t *testing.T) { IndexShards: index.DefaultIndexShards, }, expected: Config{ - ChunkEncoding: compression.EncSnappy.String(), + ChunkEncoding: compression.Snappy.String(), FlushOpBackoff: backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 10 * time.Second, @@ -740,7 +740,7 @@ func TestValidate(t *testing.T) { }, FlushOpTimeout: 15 * time.Second, IndexShards: index.DefaultIndexShards, - parsedEncoding: compression.EncSnappy, + parsedEncoding: compression.Snappy, }, }, { @@ -758,7 +758,7 @@ func TestValidate(t *testing.T) { }, { in: Config{ - ChunkEncoding: compression.EncGZIP.String(), + ChunkEncoding: compression.GZIP.String(), FlushOpBackoff: backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 10 * time.Second, @@ -771,7 +771,7 @@ func TestValidate(t *testing.T) { }, { in: Config{ - ChunkEncoding: compression.EncGZIP.String(), + ChunkEncoding: compression.GZIP.String(), FlushOpBackoff: backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 10 * time.Second, @@ -784,7 +784,7 @@ func TestValidate(t *testing.T) { }, { in: Config{ - ChunkEncoding: compression.EncGZIP.String(), + ChunkEncoding: compression.GZIP.String(), FlushOpBackoff: backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 10 * time.Second, diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 9ac86fbd3015..03e0ca976628 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -277,7 +277,7 @@ func TestStreamIterator(t *testing.T) { {"gzipChunk", func() *chunkenc.MemChunk { chunkfmt, headfmt := defaultChunkFormat(t) - return chunkenc.NewMemChunk(chunkfmt, compression.EncGZIP, headfmt, 256*1024, 0) + return chunkenc.NewMemChunk(chunkfmt, compression.GZIP, headfmt, 256*1024, 0) }}, } { t.Run(chk.name, func(t *testing.T) { diff --git a/pkg/storage/bloom/v1/archive.go b/pkg/storage/bloom/v1/archive.go index fce83d69e41d..a7b7232f230d 100644 --- a/pkg/storage/bloom/v1/archive.go +++ b/pkg/storage/bloom/v1/archive.go @@ -21,7 +21,7 @@ type TarEntry struct { Body io.ReadSeeker } -func TarCompress(enc compression.Encoding, dst io.Writer, reader BlockReader) error { +func TarCompress(enc compression.Codec, dst io.Writer, reader BlockReader) error { comprPool := compression.GetWriterPool(enc) comprWriter := comprPool.GetWriter(dst) defer func() { @@ -61,7 +61,7 @@ func Tar(dst io.Writer, reader BlockReader) error { return itr.Err() } -func UnTarCompress(enc compression.Encoding, dst string, r io.Reader) error { +func UnTarCompress(enc compression.Codec, dst string, r io.Reader) error { comprPool := compression.GetReaderPool(enc) comprReader, err := comprPool.GetReader(r) if err != nil { diff --git a/pkg/storage/bloom/v1/archive_test.go b/pkg/storage/bloom/v1/archive_test.go index b7857a4b5ed1..f91039cac369 100644 --- a/pkg/storage/bloom/v1/archive_test.go +++ b/pkg/storage/bloom/v1/archive_test.go @@ -24,7 +24,7 @@ func TestArchive(t *testing.T) { BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: compression.EncNone, + encoding: compression.None, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, @@ -82,17 +82,17 @@ func TestArchive(t *testing.T) { func TestArchiveCompression(t *testing.T) { t.Parallel() for _, tc := range []struct { - enc compression.Encoding + enc compression.Codec }{ - {compression.EncNone}, - {compression.EncGZIP}, - {compression.EncSnappy}, - {compression.EncLZ4_64k}, - {compression.EncLZ4_256k}, - {compression.EncLZ4_1M}, - {compression.EncLZ4_4M}, - {compression.EncFlate}, - {compression.EncZstd}, + {compression.None}, + {compression.GZIP}, + {compression.Snappy}, + {compression.LZ4_64k}, + {compression.LZ4_256k}, + {compression.LZ4_1M}, + {compression.LZ4_4M}, + {compression.Flate}, + {compression.Zstd}, } { t.Run(tc.enc.String(), func(t *testing.T) { // for writing files to two dirs for comparison and ensuring they're equal @@ -106,7 +106,7 @@ func TestArchiveCompression(t *testing.T) { BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: compression.EncNone, + encoding: compression.None, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index b77af18d1ace..82c85bd9f441 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -316,7 +316,7 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc mempool.Allocator, return nil, false, errors.Wrap(err, "seeking to bloom page") } - if b.schema.encoding == compression.EncNone { + if b.schema.encoding == compression.None { res, err = LazyDecodeBloomPageNoCompression(r, alloc, page) } else { res, err = LazyDecodeBloomPage(r, alloc, b.schema.DecompressorPool(), page) diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go index 79eb74033dd7..f4c7ec7d831c 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -38,7 +38,7 @@ func TestTokenizerPopulate(t *testing.T) { {Name: "pod", Value: "loki-1"}, {Name: "trace_id", Value: "3bef3c91643bde73"}, } - memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) + memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) _, _ = memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, 1), Line: testLine, @@ -83,7 +83,7 @@ func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) { {Name: "pod", Value: "loki-1"}, {Name: "trace_id", Value: "3bef3c91643bde73"}, } - memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) + memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) _, _ = memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, 1), Line: testLine, @@ -120,7 +120,7 @@ func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) { } func chunkRefItrFromMetadata(metadata ...push.LabelsAdapter) (iter.EntryIterator, error) { - memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) + memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) for i, md := range metadata { if _, err := memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, int64(i)), @@ -205,7 +205,7 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) { sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8) - memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) + memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) _, _ = memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, 1), Line: "", diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 664eb60cd596..f4bec3b2eaad 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -66,7 +66,7 @@ func (b BlockOptions) Encode(enc *encoding.Encbuf) { enc.PutBE64(b.BlockSize) } -func NewBlockOptions(enc compression.Encoding, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions { +func NewBlockOptions(enc compression.Codec, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions { opts := NewBlockOptionsFromSchema(Schema{ version: CurrentSchemaVersion, encoding: enc, diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index a2682921930f..81c367df9c81 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -15,12 +15,12 @@ import ( "github.com/grafana/loki/v3/pkg/util/mempool" ) -var blockEncodings = []compression.Encoding{ - compression.EncNone, - compression.EncGZIP, - compression.EncSnappy, - compression.EncLZ4_256k, - compression.EncZstd, +var blockEncodings = []compression.Codec{ + compression.None, + compression.GZIP, + compression.Snappy, + compression.LZ4_256k, + compression.Zstd, } func TestBlockOptions_RoundTrip(t *testing.T) { @@ -28,7 +28,7 @@ func TestBlockOptions_RoundTrip(t *testing.T) { opts := BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: compression.EncSnappy, + encoding: compression.Snappy, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, @@ -201,7 +201,7 @@ func TestMergeBuilder(t *testing.T) { blockOpts := BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: compression.EncSnappy, + encoding: compression.Snappy, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, @@ -298,7 +298,7 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) { blockOpts := BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: compression.EncSnappy, + encoding: compression.Snappy, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, @@ -395,7 +395,7 @@ func TestBlockReset(t *testing.T) { schema := Schema{ version: CurrentSchemaVersion, - encoding: compression.EncSnappy, + encoding: compression.Snappy, } builder, err := NewBlockBuilder( @@ -451,7 +451,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { blockOpts := BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: compression.EncSnappy, // test with different encodings? + encoding: compression.Snappy, // test with different encodings? }, SeriesPageSize: 100, BloomPageSize: 10 << 10, diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index ec4f575fc22a..4a22b91e7009 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -60,7 +60,7 @@ func TestFusedQuerier(t *testing.T) { BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: compression.EncSnappy, + encoding: compression.Snappy, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, @@ -147,7 +147,7 @@ func TestFusedQuerier_MultiPage(t *testing.T) { BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: compression.EncSnappy, + encoding: compression.Snappy, }, SeriesPageSize: 100, BloomPageSize: 10, // So we force one bloom per page @@ -296,7 +296,7 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: compression.EncSnappy, + encoding: compression.Snappy, }, SeriesPageSize: 100, BloomPageSize: 10, // So we force one series per page @@ -354,7 +354,7 @@ func TestFusedQuerier_SkipsEmptyBlooms(t *testing.T) { BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: compression.EncNone, + encoding: compression.None, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, @@ -415,7 +415,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: compression.EncSnappy, + encoding: compression.Snappy, }, SeriesPageSize: 256 << 10, // 256k BloomPageSize: 1 << 20, // 1MB diff --git a/pkg/storage/bloom/v1/schema.go b/pkg/storage/bloom/v1/schema.go index 954c96f757d6..7c0271434b2b 100644 --- a/pkg/storage/bloom/v1/schema.go +++ b/pkg/storage/bloom/v1/schema.go @@ -39,13 +39,13 @@ var ( type Schema struct { version Version - encoding compression.Encoding + encoding compression.Codec } func NewSchema() Schema { return Schema{ version: CurrentSchemaVersion, - encoding: compression.EncNone, + encoding: compression.None, } } @@ -105,8 +105,8 @@ func (s *Schema) Decode(dec *encoding.Decbuf) error { return errors.Errorf("invalid version. expected %d, got %d", 3, s.version) } - s.encoding = compression.Encoding(dec.Byte()) - if _, err := compression.ParseEncoding(s.encoding.String()); err != nil { + s.encoding = compression.Codec(dec.Byte()) + if _, err := compression.ParseCodec(s.encoding.String()); err != nil { return errors.Wrap(err, "parsing encoding") } diff --git a/pkg/storage/bloom/v1/test_util.go b/pkg/storage/bloom/v1/test_util.go index e8997a8cc241..4d036ba4809d 100644 --- a/pkg/storage/bloom/v1/test_util.go +++ b/pkg/storage/bloom/v1/test_util.go @@ -30,7 +30,7 @@ func MakeBlock(t testing.TB, nth int, fromFp, throughFp model.Fingerprint, fromT BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: compression.EncSnappy, + encoding: compression.Snappy, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, diff --git a/pkg/storage/bloom/v1/versioned_builder_test.go b/pkg/storage/bloom/v1/versioned_builder_test.go index 07240fe60358..9154daf77fc7 100644 --- a/pkg/storage/bloom/v1/versioned_builder_test.go +++ b/pkg/storage/bloom/v1/versioned_builder_test.go @@ -14,7 +14,7 @@ import ( // smallBlockOpts returns a set of block options that are suitable for testing // characterized by small page sizes -func smallBlockOpts(v Version, enc compression.Encoding) BlockOptions { +func smallBlockOpts(v Version, enc compression.Codec) BlockOptions { return BlockOptions{ Schema: Schema{ version: v, @@ -33,7 +33,7 @@ func setup(v Version) (BlockOptions, []SeriesWithBlooms, BlockWriter, BlockReade bloomsBuf := bytes.NewBuffer(nil) writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) reader := NewByteReader(indexBuf, bloomsBuf) - return smallBlockOpts(v, compression.EncNone), data, writer, reader + return smallBlockOpts(v, compression.None), data, writer, reader } func TestV3Roundtrip(t *testing.T) { diff --git a/pkg/storage/chunk/cache/cache_test.go b/pkg/storage/chunk/cache/cache_test.go index 2f236c1f40e4..3ff473934cdb 100644 --- a/pkg/storage/chunk/cache/cache_test.go +++ b/pkg/storage/chunk/cache/cache_test.go @@ -35,7 +35,7 @@ func fillCache(t *testing.T, scfg config.SchemaConfig, cache cache.Cache) ([]str for i := 0; i < 111; i++ { ts := model.TimeFromUnix(int64(i * chunkLen)) - cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) + cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.GZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) _, err := cs.Append(&logproto.Entry{ Timestamp: ts.Time(), diff --git a/pkg/storage/chunk/client/grpc/grpc_client_test.go b/pkg/storage/chunk/client/grpc/grpc_client_test.go index d40d825a9442..2c33c29c15b7 100644 --- a/pkg/storage/chunk/client/grpc/grpc_client_test.go +++ b/pkg/storage/chunk/client/grpc/grpc_client_test.go @@ -82,7 +82,7 @@ func TestGrpcStore(t *testing.T) { newChunkData := func() chunk.Data { return chunkenc.NewFacade( chunkenc.NewMemChunk( - chunkenc.ChunkFormatV3, compression.EncNone, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0, + chunkenc.ChunkFormatV3, compression.None, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0, ), 0, 0) } diff --git a/pkg/storage/chunk/client/testutils/testutils.go b/pkg/storage/chunk/client/testutils/testutils.go index e436c1335f21..ad0e0a8de2e6 100644 --- a/pkg/storage/chunk/client/testutils/testutils.go +++ b/pkg/storage/chunk/client/testutils/testutils.go @@ -87,7 +87,7 @@ func CreateChunks(scfg config.SchemaConfig, startIndex, batchSize int, from mode } func DummyChunkFor(from, through model.Time, metric labels.Labels) chunk.Chunk { - cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) + cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.GZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) for ts := from; ts <= through; ts = ts.Add(15 * time.Second) { _, err := cs.Append(&logproto.Entry{Timestamp: ts.Time(), Line: fmt.Sprintf("line ts=%d", ts)}) diff --git a/pkg/storage/chunk/fetcher/fetcher_test.go b/pkg/storage/chunk/fetcher/fetcher_test.go index 58123957919b..27fc5a124e46 100644 --- a/pkg/storage/chunk/fetcher/fetcher_test.go +++ b/pkg/storage/chunk/fetcher/fetcher_test.go @@ -312,7 +312,7 @@ func makeChunks(now time.Time, tpls ...c) []chunk.Chunk { from := int(chk.from) / int(time.Hour) // This is only here because it's helpful for debugging. // This isn't even the write format for Loki but we dont' care for the sake of these tests. - memChk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncNone, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) + memChk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.None, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) // To make sure the fetcher doesn't swap keys and buffers each chunk is built with different, but deterministic data for i := 0; i < from; i++ { _, _ = memChk.Append(&logproto.Entry{ diff --git a/pkg/storage/hack/main.go b/pkg/storage/hack/main.go index b2d01d2e41e0..4e6c348ceb3e 100644 --- a/pkg/storage/hack/main.go +++ b/pkg/storage/hack/main.go @@ -104,7 +104,7 @@ func fillStore(cm storage.ClientMetrics) error { labelsBuilder.Set(labels.MetricName, "logs") metric := labelsBuilder.Labels() fp := client.Fingerprint(lbs) - chunkEnc := chunkenc.NewMemChunk(chunkfmt, compression.EncLZ4_4M, headfmt, 262144, 1572864) + chunkEnc := chunkenc.NewMemChunk(chunkfmt, compression.LZ4_4M, headfmt, 262144, 1572864) for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() { entry := &logproto.Entry{ Timestamp: time.Unix(0, ts), @@ -127,7 +127,7 @@ func fillStore(cm storage.ClientMetrics) error { if flushCount >= maxChunks { return } - chunkEnc = chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncLZ4_64k, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 262144, 1572864) + chunkEnc = chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.LZ4_64k, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 262144, 1572864) } } }(i) diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index b1493089750a..197fcfa6f1e9 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -2037,7 +2037,7 @@ func TestQueryReferencingStructuredMetadata(t *testing.T) { metric := labelsBuilder.Labels() fp := client.Fingerprint(lbs) - chunkEnc := chunkenc.NewMemChunk(chunkfmt, compression.EncLZ4_4M, headfmt, 262144, 1572864) + chunkEnc := chunkenc.NewMemChunk(chunkfmt, compression.LZ4_4M, headfmt, 262144, 1572864) for ts := chkFrom; !ts.After(chkThrough); ts = ts.Add(time.Second) { entry := logproto.Entry{ Timestamp: ts, diff --git a/pkg/storage/stores/series/series_store_test.go b/pkg/storage/stores/series/series_store_test.go index 3bd136cb3b61..d64fd70b25b5 100644 --- a/pkg/storage/stores/series/series_store_test.go +++ b/pkg/storage/stores/series/series_store_test.go @@ -753,7 +753,7 @@ func dummyChunkWithFormat(t testing.TB, now model.Time, metric labels.Labels, fo samples := 1 chunkStart := now.Add(-time.Hour) - chk := chunkenc.NewMemChunk(format, compression.EncGZIP, headfmt, 256*1024, 0) + chk := chunkenc.NewMemChunk(format, compression.GZIP, headfmt, 256*1024, 0) for i := 0; i < samples; i++ { ts := time.Duration(i) * 15 * time.Second dup, err := chk.Append(&logproto.Entry{Timestamp: chunkStart.Time().Add(ts), Line: fmt.Sprintf("line %d", i)}) diff --git a/pkg/storage/stores/series_store_write_test.go b/pkg/storage/stores/series_store_write_test.go index a24608675a3d..5ff8a00d9970 100644 --- a/pkg/storage/stores/series_store_write_test.go +++ b/pkg/storage/stores/series_store_write_test.go @@ -93,7 +93,7 @@ func TestChunkWriter_PutOne(t *testing.T) { chunkfmt, headfmt, err := periodConfig.ChunkFormat() require.NoError(t, err) - memchk := chunkenc.NewMemChunk(chunkfmt, compression.EncGZIP, headfmt, 256*1024, 0) + memchk := chunkenc.NewMemChunk(chunkfmt, compression.GZIP, headfmt, 256*1024, 0) chk := chunk.NewChunk("fake", model.Fingerprint(0), []labels.Label{{Name: "foo", Value: "bar"}}, chunkenc.NewFacade(memchk, 0, 0), 100, 400) for name, tc := range map[string]struct { diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index 1390b0d9c52e..1c66e500a6b9 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -73,7 +73,7 @@ func (r Ref) Interval() Interval { type BlockRef struct { Ref - compression.Encoding + compression.Codec } func (r BlockRef) String() string { @@ -220,17 +220,17 @@ func newRefFrom(tenant, table string, md v1.BlockMetadata) Ref { } } -func newBlockRefWithEncoding(ref Ref, enc compression.Encoding) BlockRef { - return BlockRef{Ref: ref, Encoding: enc} +func newBlockRefWithEncoding(ref Ref, enc compression.Codec) BlockRef { + return BlockRef{Ref: ref, Codec: enc} } -func BlockFrom(enc compression.Encoding, tenant, table string, blk *v1.Block) (Block, error) { +func BlockFrom(enc compression.Codec, tenant, table string, blk *v1.Block) (Block, error) { md, _ := blk.Metadata() ref := newBlockRefWithEncoding(newRefFrom(tenant, table, md), enc) // TODO(owen-d): pool buf := bytes.NewBuffer(nil) - err := v1.TarCompress(ref.Encoding, buf, blk.Reader()) + err := v1.TarCompress(ref.Codec, buf, blk.Reader()) if err != nil { return Block{}, err @@ -330,7 +330,7 @@ func (b *BloomClient) GetBlock(ctx context.Context, ref BlockRef) (BlockDirector return BlockDirectory{}, fmt.Errorf("failed to create block directory %s: %w", path, err) } - err = v1.UnTarCompress(ref.Encoding, path, rc) + err = v1.UnTarCompress(ref.Codec, path, rc) if err != nil { return BlockDirectory{}, fmt.Errorf("failed to extract block file %s: %w", key, err) } diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go index 13ce7a7c97ae..04897e897ff6 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -21,16 +21,16 @@ import ( "github.com/grafana/loki/v3/pkg/storage/config" ) -var supportedCompressions = []compression.Encoding{ - compression.EncNone, - compression.EncGZIP, - compression.EncSnappy, - compression.EncLZ4_64k, - compression.EncLZ4_256k, - compression.EncLZ4_1M, - compression.EncLZ4_4M, - compression.EncFlate, - compression.EncZstd, +var supportedCompressions = []compression.Codec{ + compression.None, + compression.GZIP, + compression.Snappy, + compression.LZ4_64k, + compression.LZ4_256k, + compression.LZ4_1M, + compression.LZ4_4M, + compression.Flate, + compression.Zstd, } func parseTime(s string) model.Time { @@ -209,7 +209,7 @@ func TestBloomClient_DeleteMetas(t *testing.T) { }) } -func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, minFp, maxFp model.Fingerprint, enc compression.Encoding) (Block, error) { +func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, minFp, maxFp model.Fingerprint, enc compression.Codec) (Block, error) { step := int64((24 * time.Hour).Seconds()) day := start.Unix() / step @@ -234,7 +234,7 @@ func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, min StartTimestamp: start, EndTimestamp: start.Add(12 * time.Hour), }, - Encoding: enc, + Codec: enc, }, Data: fp, } @@ -273,9 +273,9 @@ func TestBloomClient_GetBlocks(t *testing.T) { c, _ := newMockBloomClient(t) ctx := context.Background() - b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0x0fff, compression.EncGZIP) + b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0x0fff, compression.GZIP) require.NoError(t, err) - b2, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x1000, 0xffff, compression.EncNone) + b2, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x1000, 0xffff, compression.None) require.NoError(t, err) t.Run("exists", func(t *testing.T) { @@ -318,7 +318,7 @@ func TestBloomClient_PutBlock(t *testing.T) { StartTimestamp: start, EndTimestamp: start.Add(12 * time.Hour), }, - Encoding: enc, + Codec: enc, }, Data: fp, } @@ -343,11 +343,11 @@ func TestBloomClient_DeleteBlocks(t *testing.T) { c, _ := newMockBloomClient(t) ctx := context.Background() - b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff, compression.EncNone) + b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff, compression.None) require.NoError(t, err) - b2, err := putBlock(t, c, "tenant", parseTime("2024-02-06 00:00"), 0x0000, 0xffff, compression.EncGZIP) + b2, err := putBlock(t, c, "tenant", parseTime("2024-02-06 00:00"), 0x0000, 0xffff, compression.GZIP) require.NoError(t, err) - b3, err := putBlock(t, c, "tenant", parseTime("2024-02-07 00:00"), 0x0000, 0xffff, compression.EncSnappy) + b3, err := putBlock(t, c, "tenant", parseTime("2024-02-07 00:00"), 0x0000, 0xffff, compression.Snappy) require.NoError(t, err) oc := c.client.(*testutils.InMemoryObjectClient) diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go index 9361c35e90eb..6c60c64b5f2d 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go @@ -329,11 +329,11 @@ func TestFetcher_LoadBlocksFromFS(t *testing.T) { refs := []BlockRef{ // no directory for block - {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x0000, 0x0fff)}, Encoding: compression.EncNone}, + {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x0000, 0x0fff)}, Codec: compression.None}, // invalid directory for block - {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x1000, 0x1fff)}, Encoding: compression.EncSnappy}, + {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x1000, 0x1fff)}, Codec: compression.Snappy}, // valid directory for block - {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x2000, 0x2fff)}, Encoding: compression.EncGZIP}, + {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x2000, 0x2fff)}, Codec: compression.GZIP}, } dirs := []string{ localFilePathWithoutExtension(refs[0], resolver), diff --git a/pkg/storage/stores/shipper/bloomshipper/resolver.go b/pkg/storage/stores/shipper/bloomshipper/resolver.go index 3115f731fe13..f101b55896a1 100644 --- a/pkg/storage/stores/shipper/bloomshipper/resolver.go +++ b/pkg/storage/stores/shipper/bloomshipper/resolver.go @@ -81,7 +81,7 @@ func (defaultKeyResolver) ParseMetaKey(loc Location) (MetaRef, error) { } func (defaultKeyResolver) Block(ref BlockRef) Location { - ext := blockExtension + compression.ToFileExtension(ref.Encoding) + ext := blockExtension + compression.ToFileExtension(ref.Codec) return simpleLocation{ BloomPrefix, fmt.Sprintf("%v", ref.TableName), @@ -95,7 +95,7 @@ func (defaultKeyResolver) Block(ref BlockRef) Location { func (defaultKeyResolver) ParseBlockKey(loc Location) (BlockRef, error) { dir, fn := path.Split(loc.Addr()) - ext, enc := path.Ext(fn), compression.EncNone + ext, enc := path.Ext(fn), compression.None if ext != "" && ext != blockExtension { // trim compression extension fn = strings.TrimSuffix(fn, ext) @@ -142,7 +142,7 @@ func (defaultKeyResolver) ParseBlockKey(loc Location) (BlockRef, error) { EndTimestamp: interval.End, Checksum: uint32(checksum), }, - Encoding: enc, + Codec: enc, }, nil } @@ -286,9 +286,9 @@ func (ls locations) LocalPath() string { } func cacheKey(ref BlockRef) string { - return strings.TrimSuffix(defaultKeyResolver{}.Block(ref).Addr(), blockExtension+compression.ToFileExtension(ref.Encoding)) + return strings.TrimSuffix(defaultKeyResolver{}.Block(ref).Addr(), blockExtension+compression.ToFileExtension(ref.Codec)) } func localFilePathWithoutExtension(ref BlockRef, res KeyResolver) string { - return strings.TrimSuffix(res.Block(ref).LocalPath(), blockExtension+compression.ToFileExtension(ref.Encoding)) + return strings.TrimSuffix(res.Block(ref).LocalPath(), blockExtension+compression.ToFileExtension(ref.Codec)) } diff --git a/pkg/storage/stores/shipper/bloomshipper/resolver_test.go b/pkg/storage/stores/shipper/bloomshipper/resolver_test.go index 259bf7b2db3a..21da2e4e6909 100644 --- a/pkg/storage/stores/shipper/bloomshipper/resolver_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/resolver_test.go @@ -33,17 +33,17 @@ func TestResolver_ParseMetaKey(t *testing.T) { func TestResolver_ParseBlockKey(t *testing.T) { for _, tc := range []struct { - srcEnc, dstEnc compression.Encoding + srcEnc, dstEnc compression.Codec }{ - {compression.EncNone, compression.EncNone}, - {compression.EncGZIP, compression.EncGZIP}, - {compression.EncSnappy, compression.EncSnappy}, - {compression.EncLZ4_64k, compression.EncLZ4_4M}, - {compression.EncLZ4_256k, compression.EncLZ4_4M}, - {compression.EncLZ4_1M, compression.EncLZ4_4M}, - {compression.EncLZ4_4M, compression.EncLZ4_4M}, - {compression.EncFlate, compression.EncFlate}, - {compression.EncZstd, compression.EncZstd}, + {compression.None, compression.None}, + {compression.GZIP, compression.GZIP}, + {compression.Snappy, compression.Snappy}, + {compression.LZ4_64k, compression.LZ4_4M}, + {compression.LZ4_256k, compression.LZ4_4M}, + {compression.LZ4_1M, compression.LZ4_4M}, + {compression.LZ4_4M, compression.LZ4_4M}, + {compression.Flate, compression.Flate}, + {compression.Zstd, compression.Zstd}, } { t.Run(tc.srcEnc.String(), func(t *testing.T) { r := defaultKeyResolver{} @@ -56,7 +56,7 @@ func TestResolver_ParseBlockKey(t *testing.T) { EndTimestamp: 3600000, Checksum: 43981, }, - Encoding: tc.srcEnc, + Codec: tc.srcEnc, } // encode block ref as string @@ -69,8 +69,8 @@ func TestResolver_ParseBlockKey(t *testing.T) { parsed, err := r.ParseBlockKey(key(path)) require.NoError(t, err) expected := BlockRef{ - Ref: ref.Ref, - Encoding: tc.dstEnc, + Ref: ref.Ref, + Codec: tc.dstEnc, } require.Equal(t, expected, parsed) }) diff --git a/pkg/storage/stores/shipper/bloomshipper/store_test.go b/pkg/storage/stores/shipper/bloomshipper/store_test.go index 674e0c02a506..92adcf924922 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/store_test.go @@ -116,7 +116,7 @@ func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start err := blockWriter.Init() require.NoError(t, err) - enc := compression.EncGZIP + enc := compression.GZIP err = v1.TarCompress(enc, fp, v1.NewDirectoryBlockReader(tmpDir)) require.NoError(t, err) @@ -130,7 +130,7 @@ func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start StartTimestamp: start, EndTimestamp: start.Add(12 * time.Hour), }, - Encoding: enc, + Codec: enc, }, Data: fp, } diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go index a7ea7af3b05e..04948c38a17c 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go @@ -32,7 +32,7 @@ func createChunk(t testing.TB, chunkFormat byte, headBlockFmt chunkenc.HeadBlock labelsBuilder.Set(labels.MetricName, "logs") metric := labelsBuilder.Labels() fp := ingesterclient.Fingerprint(lbs) - chunkEnc := chunkenc.NewMemChunk(chunkFormat, compression.EncSnappy, headBlockFmt, blockSize, targetSize) + chunkEnc := chunkenc.NewMemChunk(chunkFormat, compression.Snappy, headBlockFmt, blockSize, targetSize) for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) { dup, err := chunkEnc.Append(&logproto.Entry{ diff --git a/pkg/storage/stores/shipper/indexshipper/uploads/index_set.go b/pkg/storage/stores/shipper/indexshipper/uploads/index_set.go index 36dc13850956..d0d04a5104a5 100644 --- a/pkg/storage/stores/shipper/indexshipper/uploads/index_set.go +++ b/pkg/storage/stores/shipper/indexshipper/uploads/index_set.go @@ -145,7 +145,7 @@ func (t *indexSet) uploadIndex(ctx context.Context, idx index.Index) error { } }() - gzipPool := compression.GetWriterPool(compression.EncGZIP) + gzipPool := compression.GetWriterPool(compression.GZIP) compressedWriter := gzipPool.GetWriter(f) defer gzipPool.PutWriter(compressedWriter) diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index dd535197afb3..a0dc75999692 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -109,7 +109,7 @@ func newChunk(chunkFormat byte, headBlockFmt chunkenc.HeadBlockFmt, stream logpr lbs = builder.Labels() } from, through := loki_util.RoundToMilliseconds(stream.Entries[0].Timestamp, stream.Entries[len(stream.Entries)-1].Timestamp) - chk := chunkenc.NewMemChunk(chunkFormat, compression.EncGZIP, headBlockFmt, 256*1024, 0) + chk := chunkenc.NewMemChunk(chunkFormat, compression.GZIP, headBlockFmt, 256*1024, 0) for _, e := range stream.Entries { _, _ = chk.Append(&e) } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 153073b74e6c..9b362f20704f 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -490,7 +490,7 @@ func (l *Limits) Validate() error { return errors.Wrap(err, "invalid tsdb sharding strategy") } - if _, err := compression.ParseEncoding(l.BloomBlockEncoding); err != nil { + if _, err := compression.ParseCodec(l.BloomBlockEncoding); err != nil { return err } diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index 87fab6837029..19278c77a342 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -339,7 +339,7 @@ func TestLimitsValidation(t *testing.T) { }, { limits: Limits{DeletionMode: "disabled", BloomBlockEncoding: "unknown"}, - expected: fmt.Errorf("invalid encoding: unknown, supported: %s", compression.SupportedEncoding()), + expected: fmt.Errorf("invalid encoding: unknown, supported: %s", compression.SupportedCodecs()), }, } { desc := fmt.Sprintf("%s/%s", tc.limits.DeletionMode, tc.limits.BloomBlockEncoding) diff --git a/tools/tsdb/migrate-versions/main.go b/tools/tsdb/migrate-versions/main.go index e4fb39e69a4f..d3853442b6e8 100644 --- a/tools/tsdb/migrate-versions/main.go +++ b/tools/tsdb/migrate-versions/main.go @@ -257,7 +257,7 @@ func uploadFile(idx shipperindex.Index, indexStorageClient shipperstorage.Client } }() - gzipPool := compression.GetWriterPool(compression.EncGZIP) + gzipPool := compression.GetWriterPool(compression.GZIP) compressedWriter := gzipPool.GetWriter(f) defer gzipPool.PutWriter(compressedWriter)