diff --git a/docs/sources/setup/install/helm/reference.md b/docs/sources/setup/install/helm/reference.md
index 8fc5427a07424..a337eb4530de0 100644
--- a/docs/sources/setup/install/helm/reference.md
+++ b/docs/sources/setup/install/helm/reference.md
@@ -3039,6 +3039,7 @@ null
},
"provisioner": {
"additionalTenants": [],
+ "affinity": {},
"annotations": {},
"enabled": true,
"env": [],
@@ -3051,6 +3052,7 @@ null
"tag": null
},
"labels": {},
+ "nodeSelector": {},
"priorityClassName": null,
"provisionedSecretPrefix": null,
"securityContext": {
@@ -3058,9 +3060,11 @@ null
"runAsGroup": 10001,
"runAsNonRoot": true,
"runAsUser": 10001
- }
+ },
+ "tolerations": []
},
"tokengen": {
+ "affinity": {},
"annotations": {},
"enabled": true,
"env": [],
@@ -3069,6 +3073,7 @@ null
"extraVolumeMounts": [],
"extraVolumes": [],
"labels": {},
+ "nodeSelector": {},
"priorityClassName": "",
"securityContext": {
"fsGroup": 10001,
@@ -3222,6 +3227,7 @@ null
{
"additionalTenants": [],
+ "affinity": {},
"annotations": {},
"enabled": true,
"env": [],
@@ -3234,6 +3240,7 @@ null
"tag": null
},
"labels": {},
+ "nodeSelector": {},
"priorityClassName": null,
"provisionedSecretPrefix": null,
"securityContext": {
@@ -3241,7 +3248,8 @@ null
"runAsGroup": 10001,
"runAsNonRoot": true,
"runAsUser": 10001
- }
+ },
+ "tolerations": []
}
|
@@ -3253,6 +3261,15 @@ null
[]
+ |
+
+
+ enterprise.provisioner.affinity |
+ object |
+ Affinity for tokengen Pods |
+
+{}
+
|
@@ -3358,6 +3375,15 @@ null
{}
+ |
+
+
+ enterprise.provisioner.nodeSelector |
+ object |
+ Node selector for tokengen Pods |
+
+{}
+
|
@@ -3390,6 +3416,15 @@ null
"runAsUser": 10001
}
+
+
+
+ enterprise.provisioner.tolerations |
+ list |
+ Tolerations for tokengen Pods |
+
+[]
+
|
@@ -3398,6 +3433,7 @@ null
Configuration for `tokengen` target |
{
+ "affinity": {},
"annotations": {},
"enabled": true,
"env": [],
@@ -3406,6 +3442,7 @@ null
"extraVolumeMounts": [],
"extraVolumes": [],
"labels": {},
+ "nodeSelector": {},
"priorityClassName": "",
"securityContext": {
"fsGroup": 10001,
@@ -3417,6 +3454,15 @@ null
"tolerations": []
}
+ |
+
+
+ enterprise.tokengen.affinity |
+ object |
+ Affinity for tokengen Pods |
+
+{}
+
|
@@ -3489,6 +3535,15 @@ true
{}
+ |
+
+
+ enterprise.tokengen.nodeSelector |
+ object |
+ Node selector for tokengen Pods |
+
+{}
+
|
diff --git a/operator/internal/manifests/openshift/alertingrule.go b/operator/internal/manifests/openshift/alertingrule.go
index 22923ed15482e..e4869e9f7ca5c 100644
--- a/operator/internal/manifests/openshift/alertingrule.go
+++ b/operator/internal/manifests/openshift/alertingrule.go
@@ -5,23 +5,32 @@ import lokiv1 "github.com/grafana/loki/operator/apis/loki/v1"
func AlertingRuleTenantLabels(ar *lokiv1.AlertingRule) {
switch ar.Spec.TenantID {
case tenantApplication:
- for groupIdx, group := range ar.Spec.Groups {
- group := group
- for ruleIdx, rule := range group.Rules {
- rule := rule
- if rule.Labels == nil {
- rule.Labels = map[string]string{}
- }
- rule.Labels[opaDefaultLabelMatcher] = ar.Namespace
- group.Rules[ruleIdx] = rule
- }
- ar.Spec.Groups[groupIdx] = group
- }
- case tenantInfrastructure, tenantAudit:
- // Do nothing
- case tenantNetwork:
- // Do nothing
+ appendAlertingRuleLabels(ar, map[string]string{
+ opaDefaultLabelMatcher: ar.Namespace,
+ ocpMonitoringGroupByLabel: ar.Namespace,
+ })
+ case tenantInfrastructure, tenantAudit, tenantNetwork:
+ appendAlertingRuleLabels(ar, map[string]string{
+ ocpMonitoringGroupByLabel: ar.Namespace,
+ })
default:
// Do nothing
}
}
+
+func appendAlertingRuleLabels(ar *lokiv1.AlertingRule, labels map[string]string) {
+ for groupIdx, group := range ar.Spec.Groups {
+ for ruleIdx, rule := range group.Rules {
+ if rule.Labels == nil {
+ rule.Labels = map[string]string{}
+ }
+
+ for name, value := range labels {
+ rule.Labels[name] = value
+ }
+
+ group.Rules[ruleIdx] = rule
+ }
+ ar.Spec.Groups[groupIdx] = group
+ }
+}
diff --git a/operator/internal/manifests/openshift/alertingrule_test.go b/operator/internal/manifests/openshift/alertingrule_test.go
index 91da560e2a6df..2a1d032e8ed47 100644
--- a/operator/internal/manifests/openshift/alertingrule_test.go
+++ b/operator/internal/manifests/openshift/alertingrule_test.go
@@ -46,7 +46,8 @@ func TestAlertingRuleTenantLabels(t *testing.T) {
{
Alert: "alert",
Labels: map[string]string{
- opaDefaultLabelMatcher: "test-ns",
+ opaDefaultLabelMatcher: "test-ns",
+ ocpMonitoringGroupByLabel: "test-ns",
},
},
},
@@ -57,6 +58,9 @@ func TestAlertingRuleTenantLabels(t *testing.T) {
},
{
rule: &lokiv1.AlertingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.AlertingRuleSpec{
TenantID: tenantInfrastructure,
Groups: []*lokiv1.AlertingRuleGroup{
@@ -72,6 +76,9 @@ func TestAlertingRuleTenantLabels(t *testing.T) {
},
},
want: &lokiv1.AlertingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.AlertingRuleSpec{
TenantID: tenantInfrastructure,
Groups: []*lokiv1.AlertingRuleGroup{
@@ -80,6 +87,9 @@ func TestAlertingRuleTenantLabels(t *testing.T) {
Rules: []*lokiv1.AlertingRuleGroupSpec{
{
Alert: "alert",
+ Labels: map[string]string{
+ ocpMonitoringGroupByLabel: "test-ns",
+ },
},
},
},
@@ -89,6 +99,9 @@ func TestAlertingRuleTenantLabels(t *testing.T) {
},
{
rule: &lokiv1.AlertingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.AlertingRuleSpec{
TenantID: tenantAudit,
Groups: []*lokiv1.AlertingRuleGroup{
@@ -104,6 +117,9 @@ func TestAlertingRuleTenantLabels(t *testing.T) {
},
},
want: &lokiv1.AlertingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.AlertingRuleSpec{
TenantID: tenantAudit,
Groups: []*lokiv1.AlertingRuleGroup{
@@ -112,6 +128,9 @@ func TestAlertingRuleTenantLabels(t *testing.T) {
Rules: []*lokiv1.AlertingRuleGroupSpec{
{
Alert: "alert",
+ Labels: map[string]string{
+ ocpMonitoringGroupByLabel: "test-ns",
+ },
},
},
},
@@ -121,6 +140,9 @@ func TestAlertingRuleTenantLabels(t *testing.T) {
},
{
rule: &lokiv1.AlertingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.AlertingRuleSpec{
TenantID: tenantNetwork,
Groups: []*lokiv1.AlertingRuleGroup{
@@ -136,6 +158,9 @@ func TestAlertingRuleTenantLabels(t *testing.T) {
},
},
want: &lokiv1.AlertingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.AlertingRuleSpec{
TenantID: tenantNetwork,
Groups: []*lokiv1.AlertingRuleGroup{
@@ -144,6 +169,9 @@ func TestAlertingRuleTenantLabels(t *testing.T) {
Rules: []*lokiv1.AlertingRuleGroupSpec{
{
Alert: "alert",
+ Labels: map[string]string{
+ ocpMonitoringGroupByLabel: "test-ns",
+ },
},
},
},
@@ -153,6 +181,9 @@ func TestAlertingRuleTenantLabels(t *testing.T) {
},
{
rule: &lokiv1.AlertingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.AlertingRuleSpec{
TenantID: "unknown",
Groups: []*lokiv1.AlertingRuleGroup{
@@ -168,6 +199,9 @@ func TestAlertingRuleTenantLabels(t *testing.T) {
},
},
want: &lokiv1.AlertingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.AlertingRuleSpec{
TenantID: "unknown",
Groups: []*lokiv1.AlertingRuleGroup{
diff --git a/operator/internal/manifests/openshift/opa_openshift.go b/operator/internal/manifests/openshift/opa_openshift.go
index 9175983f89e14..ccf5eac09b7a7 100644
--- a/operator/internal/manifests/openshift/opa_openshift.go
+++ b/operator/internal/manifests/openshift/opa_openshift.go
@@ -13,14 +13,15 @@ import (
)
const (
- envRelatedImageOPA = "RELATED_IMAGE_OPA"
- defaultOPAImage = "quay.io/observatorium/opa-openshift:latest"
- opaContainerName = "opa"
- opaDefaultPackage = "lokistack"
- opaDefaultAPIGroup = "loki.grafana.com"
- opaMetricsPortName = "opa-metrics"
- opaDefaultLabelMatcher = "kubernetes_namespace_name"
- opaNetworkLabelMatchers = "SrcK8S_Namespace,DstK8S_Namespace"
+ envRelatedImageOPA = "RELATED_IMAGE_OPA"
+ defaultOPAImage = "quay.io/observatorium/opa-openshift:latest"
+ opaContainerName = "opa"
+ opaDefaultPackage = "lokistack"
+ opaDefaultAPIGroup = "loki.grafana.com"
+ opaMetricsPortName = "opa-metrics"
+ opaDefaultLabelMatcher = "kubernetes_namespace_name"
+ opaNetworkLabelMatchers = "SrcK8S_Namespace,DstK8S_Namespace"
+ ocpMonitoringGroupByLabel = "namespace"
)
func newOPAOpenShiftContainer(mode lokiv1.ModeType, secretVolumeName, tlsDir, minTLSVersion, ciphers string, withTLS bool, adminGroups []string) corev1.Container {
diff --git a/operator/internal/manifests/openshift/recordingrule.go b/operator/internal/manifests/openshift/recordingrule.go
new file mode 100644
index 0000000000000..97be1bb4a17ec
--- /dev/null
+++ b/operator/internal/manifests/openshift/recordingrule.go
@@ -0,0 +1,36 @@
+package openshift
+
+import lokiv1 "github.com/grafana/loki/operator/apis/loki/v1"
+
+func RecordingRuleTenantLabels(r *lokiv1.RecordingRule) {
+ switch r.Spec.TenantID {
+ case tenantApplication:
+ appendRecordingRuleLabels(r, map[string]string{
+ opaDefaultLabelMatcher: r.Namespace,
+ ocpMonitoringGroupByLabel: r.Namespace,
+ })
+ case tenantInfrastructure, tenantAudit, tenantNetwork:
+ appendRecordingRuleLabels(r, map[string]string{
+ ocpMonitoringGroupByLabel: r.Namespace,
+ })
+ default:
+ // Do nothing
+ }
+}
+
+func appendRecordingRuleLabels(r *lokiv1.RecordingRule, labels map[string]string) {
+ for groupIdx, group := range r.Spec.Groups {
+ for ruleIdx, rule := range group.Rules {
+ if rule.Labels == nil {
+ rule.Labels = map[string]string{}
+ }
+
+ for name, value := range labels {
+ rule.Labels[name] = value
+ }
+
+ group.Rules[ruleIdx] = rule
+ }
+ r.Spec.Groups[groupIdx] = group
+ }
+}
diff --git a/operator/internal/manifests/openshift/recordingrule_test.go b/operator/internal/manifests/openshift/recordingrule_test.go
index 49e30de999f35..6a620bc85d8de 100644
--- a/operator/internal/manifests/openshift/recordingrule_test.go
+++ b/operator/internal/manifests/openshift/recordingrule_test.go
@@ -46,7 +46,8 @@ func TestRecordingRuleTenantLabels(t *testing.T) {
{
Record: "record",
Labels: map[string]string{
- opaDefaultLabelMatcher: "test-ns",
+ opaDefaultLabelMatcher: "test-ns",
+ ocpMonitoringGroupByLabel: "test-ns",
},
},
},
@@ -57,6 +58,9 @@ func TestRecordingRuleTenantLabels(t *testing.T) {
},
{
rule: &lokiv1.RecordingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.RecordingRuleSpec{
TenantID: tenantInfrastructure,
Groups: []*lokiv1.RecordingRuleGroup{
@@ -72,6 +76,9 @@ func TestRecordingRuleTenantLabels(t *testing.T) {
},
},
want: &lokiv1.RecordingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.RecordingRuleSpec{
TenantID: tenantInfrastructure,
Groups: []*lokiv1.RecordingRuleGroup{
@@ -80,6 +87,9 @@ func TestRecordingRuleTenantLabels(t *testing.T) {
Rules: []*lokiv1.RecordingRuleGroupSpec{
{
Record: "record",
+ Labels: map[string]string{
+ ocpMonitoringGroupByLabel: "test-ns",
+ },
},
},
},
@@ -89,6 +99,9 @@ func TestRecordingRuleTenantLabels(t *testing.T) {
},
{
rule: &lokiv1.RecordingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.RecordingRuleSpec{
TenantID: tenantAudit,
Groups: []*lokiv1.RecordingRuleGroup{
@@ -104,6 +117,9 @@ func TestRecordingRuleTenantLabels(t *testing.T) {
},
},
want: &lokiv1.RecordingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.RecordingRuleSpec{
TenantID: tenantAudit,
Groups: []*lokiv1.RecordingRuleGroup{
@@ -112,6 +128,9 @@ func TestRecordingRuleTenantLabels(t *testing.T) {
Rules: []*lokiv1.RecordingRuleGroupSpec{
{
Record: "record",
+ Labels: map[string]string{
+ ocpMonitoringGroupByLabel: "test-ns",
+ },
},
},
},
@@ -121,6 +140,9 @@ func TestRecordingRuleTenantLabels(t *testing.T) {
},
{
rule: &lokiv1.RecordingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.RecordingRuleSpec{
TenantID: tenantNetwork,
Groups: []*lokiv1.RecordingRuleGroup{
@@ -136,6 +158,9 @@ func TestRecordingRuleTenantLabels(t *testing.T) {
},
},
want: &lokiv1.RecordingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.RecordingRuleSpec{
TenantID: tenantNetwork,
Groups: []*lokiv1.RecordingRuleGroup{
@@ -144,6 +169,9 @@ func TestRecordingRuleTenantLabels(t *testing.T) {
Rules: []*lokiv1.RecordingRuleGroupSpec{
{
Record: "record",
+ Labels: map[string]string{
+ ocpMonitoringGroupByLabel: "test-ns",
+ },
},
},
},
@@ -153,6 +181,9 @@ func TestRecordingRuleTenantLabels(t *testing.T) {
},
{
rule: &lokiv1.RecordingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.RecordingRuleSpec{
TenantID: "unknown",
Groups: []*lokiv1.RecordingRuleGroup{
@@ -168,6 +199,9 @@ func TestRecordingRuleTenantLabels(t *testing.T) {
},
},
want: &lokiv1.RecordingRule{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test-ns",
+ },
Spec: lokiv1.RecordingRuleSpec{
TenantID: "unknown",
Groups: []*lokiv1.RecordingRuleGroup{
diff --git a/operator/internal/manifests/openshift/recordngrule.go b/operator/internal/manifests/openshift/recordngrule.go
deleted file mode 100644
index e4448affeae99..0000000000000
--- a/operator/internal/manifests/openshift/recordngrule.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package openshift
-
-import lokiv1 "github.com/grafana/loki/operator/apis/loki/v1"
-
-func RecordingRuleTenantLabels(r *lokiv1.RecordingRule) {
- switch r.Spec.TenantID {
- case tenantApplication:
- for groupIdx, group := range r.Spec.Groups {
- group := group
- for ruleIdx, rule := range group.Rules {
- rule := rule
- if rule.Labels == nil {
- rule.Labels = map[string]string{}
- }
- rule.Labels[opaDefaultLabelMatcher] = r.Namespace
- group.Rules[ruleIdx] = rule
- }
- r.Spec.Groups[groupIdx] = group
- }
- case tenantInfrastructure, tenantAudit:
- // Do nothing
- case tenantNetwork:
- // Do nothing
- default:
- // Do nothing
- }
-}
diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go
index fdeab9cf92c75..63950d7eadcbd 100644
--- a/pkg/bloombuild/builder/builder.go
+++ b/pkg/bloombuild/builder/builder.go
@@ -34,7 +34,7 @@ import (
)
// TODO(chaudum): Make configurable via (per-tenant?) setting.
-var blockCompressionAlgo = compression.EncNone
+var defaultBlockCompressionCodec = compression.None
type Builder struct {
services.Service
@@ -336,7 +336,7 @@ func (b *Builder) processTask(
return nil, fmt.Errorf("failed to get client: %w", err)
}
- blockEnc, err := compression.ParseEncoding(b.limits.BloomBlockEncoding(task.Tenant))
+ blockEnc, err := compression.ParseCodec(b.limits.BloomBlockEncoding(task.Tenant))
if err != nil {
return nil, fmt.Errorf("failed to parse block encoding: %w", err)
}
@@ -407,7 +407,7 @@ func (b *Builder) processTask(
blockCt++
blk := newBlocks.At()
- built, err := bloomshipper.BlockFrom(blockCompressionAlgo, tenant, task.Table.Addr(), blk)
+ built, err := bloomshipper.BlockFrom(defaultBlockCompressionCodec, tenant, task.Table.Addr(), blk)
if err != nil {
level.Error(logger).Log("msg", "failed to build block", "err", err)
if err = blk.Reader().Cleanup(); err != nil {
diff --git a/pkg/bloombuild/builder/spec_test.go b/pkg/bloombuild/builder/spec_test.go
index 330c0552b657f..23afa58754743 100644
--- a/pkg/bloombuild/builder/spec_test.go
+++ b/pkg/bloombuild/builder/spec_test.go
@@ -115,7 +115,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v2.Iterator[*v1.Ser
func TestSimpleBloomGenerator(t *testing.T) {
const maxBlockSize = 100 << 20 // 100MB
- for _, enc := range []compression.Encoding{compression.EncNone, compression.EncGZIP, compression.EncSnappy} {
+ for _, enc := range []compression.Codec{compression.None, compression.GZIP, compression.Snappy} {
for _, tc := range []struct {
desc string
fromSchema, toSchema v1.BlockOptions
diff --git a/pkg/bloombuild/common/tsdb.go b/pkg/bloombuild/common/tsdb.go
index ea31767cca0b2..e45ff4b153c7c 100644
--- a/pkg/bloombuild/common/tsdb.go
+++ b/pkg/bloombuild/common/tsdb.go
@@ -102,7 +102,7 @@ func (b *BloomTSDBStore) LoadTSDB(
}
defer data.Close()
- decompressorPool := compression.GetReaderPool(compression.EncGZIP)
+ decompressorPool := compression.GetReaderPool(compression.GZIP)
decompressor, err := decompressorPool.GetReader(data)
if err != nil {
return nil, errors.Wrap(err, "failed to get decompressor")
diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go
index ea780c98e8eed..9523a45795579 100644
--- a/pkg/bloombuild/planner/planner_test.go
+++ b/pkg/bloombuild/planner/planner_test.go
@@ -188,7 +188,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := v1.NewByteReader(indexBuf, bloomsBuf)
- blockOpts := v1.NewBlockOptions(compression.EncNone, 0, 0)
+ blockOpts := v1.NewBlockOptions(compression.None, 0, 0)
builder, err := v1.NewBlockBuilder(blockOpts, writer)
if err != nil {
@@ -202,7 +202,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
block := v1.NewBlock(reader, v1.NewMetrics(nil))
buf := bytes.NewBuffer(nil)
- if err := v1.TarCompress(ref.Encoding, buf, block.Reader()); err != nil {
+ if err := v1.TarCompress(ref.Codec, buf, block.Reader()); err != nil {
return bloomshipper.Block{}, err
}
diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go
index e28298605118c..b95f92c2fdfd9 100644
--- a/pkg/chunkenc/dumb_chunk.go
+++ b/pkg/chunkenc/dumb_chunk.go
@@ -70,7 +70,7 @@ func (c *dumbChunk) Utilization() float64 {
return float64(len(c.entries)) / float64(tmpNumEntries)
}
-func (c *dumbChunk) Encoding() compression.Encoding { return compression.EncNone }
+func (c *dumbChunk) Encoding() compression.Codec { return compression.None }
// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go
index 057fc8b985ad3..e894b687236c0 100644
--- a/pkg/chunkenc/interface.go
+++ b/pkg/chunkenc/interface.go
@@ -68,7 +68,7 @@ type Chunk interface {
UncompressedSize() int
CompressedSize() int
Close() error
- Encoding() compression.Encoding
+ Encoding() compression.Codec
Rebound(start, end time.Time, filter filter.Func) (Chunk, error)
}
diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go
index 03f33b8176729..790210d3af8b2 100644
--- a/pkg/chunkenc/memchunk.go
+++ b/pkg/chunkenc/memchunk.go
@@ -132,7 +132,7 @@ type MemChunk struct {
head HeadBlock
format byte
- encoding compression.Encoding
+ encoding compression.Codec
headFmt HeadBlockFmt
// compressed size of chunk. Set when chunk is cut or while decoding chunk from storage.
@@ -355,7 +355,7 @@ type entry struct {
}
// NewMemChunk returns a new in-mem chunk.
-func NewMemChunk(chunkFormat byte, enc compression.Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
+func NewMemChunk(chunkFormat byte, enc compression.Codec, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
return newMemChunkWithFormat(chunkFormat, enc, head, blockSize, targetSize)
}
@@ -370,7 +370,7 @@ func panicIfInvalidFormat(chunkFmt byte, head HeadBlockFmt) {
}
// NewMemChunk returns a new in-mem chunk.
-func newMemChunkWithFormat(format byte, enc compression.Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
+func newMemChunkWithFormat(format byte, enc compression.Codec, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
panicIfInvalidFormat(format, head)
symbolizer := newSymbolizer()
@@ -414,10 +414,10 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
bc.format = version
switch version {
case ChunkFormatV1:
- bc.encoding = compression.EncGZIP
+ bc.encoding = compression.GZIP
case ChunkFormatV2, ChunkFormatV3, ChunkFormatV4:
// format v2+ has a byte for block encoding.
- enc := compression.Encoding(db.byte())
+ enc := compression.Codec(db.byte())
if db.err() != nil {
return nil, errors.Wrap(db.err(), "verifying encoding")
}
@@ -777,7 +777,7 @@ func MemchunkFromCheckpoint(chk, head []byte, desiredIfNotUnordered HeadBlockFmt
}
// Encoding implements Chunk.
-func (c *MemChunk) Encoding() compression.Encoding {
+func (c *MemChunk) Encoding() compression.Codec {
return c.encoding
}
@@ -1173,7 +1173,7 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err
// then allows us to bind a decoding context to a block when requested, but otherwise helps reduce the
// chances of chunk<>block encoding drift in the codebase as the latter is parameterized by the former.
type encBlock struct {
- enc compression.Encoding
+ enc compression.Codec
format byte
symbolizer *symbolizer
block
diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go
index 987a5d88b286e..24d4ab2d2c2cd 100644
--- a/pkg/chunkenc/memchunk_test.go
+++ b/pkg/chunkenc/memchunk_test.go
@@ -32,16 +32,16 @@ import (
"github.com/grafana/loki/v3/pkg/util/filter"
)
-var testEncodings = []compression.Encoding{
- compression.EncNone,
- compression.EncGZIP,
- compression.EncLZ4_64k,
- compression.EncLZ4_256k,
- compression.EncLZ4_1M,
- compression.EncLZ4_4M,
- compression.EncSnappy,
- compression.EncFlate,
- compression.EncZstd,
+var testEncodings = []compression.Codec{
+ compression.None,
+ compression.GZIP,
+ compression.LZ4_64k,
+ compression.LZ4_256k,
+ compression.LZ4_1M,
+ compression.LZ4_4M,
+ compression.Snappy,
+ compression.Flate,
+ compression.Zstd,
}
var (
@@ -299,7 +299,7 @@ func TestCorruptChunk(t *testing.T) {
func TestReadFormatV1(t *testing.T) {
t.Parallel()
- c := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
+ c := NewMemChunk(ChunkFormatV3, compression.GZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
fillChunk(c)
// overrides to v1 for testing that specific version.
c.format = ChunkFormatV1
@@ -391,7 +391,7 @@ func TestRoundtripV2(t *testing.T) {
}
}
-func testNameWithFormats(enc compression.Encoding, chunkFormat byte, headBlockFmt HeadBlockFmt) string {
+func testNameWithFormats(enc compression.Codec, chunkFormat byte, headBlockFmt HeadBlockFmt) string {
return fmt.Sprintf("encoding:%v chunkFormat:%v headBlockFmt:%v", enc, chunkFormat, headBlockFmt)
}
@@ -558,7 +558,7 @@ func TestChunkFilling(t *testing.T) {
func TestGZIPChunkTargetSize(t *testing.T) {
t.Parallel()
- chk := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
+ chk := NewMemChunk(ChunkFormatV3, compression.GZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
lineSize := 512
entry := &logproto.Entry{
@@ -681,7 +681,7 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()
- tester(t, NewMemChunk(ChunkFormatV3, compression.EncGZIP, f, testBlockSize, testTargetSize))
+ tester(t, NewMemChunk(ChunkFormatV3, compression.GZIP, f, testBlockSize, testTargetSize))
})
}
}
@@ -726,7 +726,7 @@ func TestChunkSize(t *testing.T) {
}
func TestChunkStats(t *testing.T) {
- c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, DefaultTestHeadBlockFmt, testBlockSize, 0)
+ c := NewMemChunk(ChunkFormatV4, compression.Snappy, DefaultTestHeadBlockFmt, testBlockSize, 0)
first := time.Now()
entry := &logproto.Entry{
Timestamp: first,
@@ -968,7 +968,7 @@ func BenchmarkBackwardIterator(b *testing.B) {
for _, bs := range testBlockSizes {
b.Run(humanize.Bytes(uint64(bs)), func(b *testing.B) {
b.ReportAllocs()
- c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, DefaultTestHeadBlockFmt, bs, testTargetSize)
+ c := NewMemChunk(ChunkFormatV4, compression.Snappy, DefaultTestHeadBlockFmt, bs, testTargetSize)
_ = fillChunk(c)
b.ResetTimer()
for n := 0; n < b.N; n++ {
@@ -1082,7 +1082,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) {
func TestMemChunk_IteratorBounds(t *testing.T) {
createChunk := func() *MemChunk {
t.Helper()
- c := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, 1e6, 1e6)
+ c := NewMemChunk(ChunkFormatV3, compression.None, DefaultTestHeadBlockFmt, 1e6, 1e6)
if _, err := c.Append(&logproto.Entry{
Timestamp: time.Unix(0, 1),
@@ -1168,9 +1168,9 @@ func TestMemchunkLongLine(t *testing.T) {
func TestBytesWith(t *testing.T) {
t.Parallel()
- exp, err := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil)
+ exp, err := NewMemChunk(ChunkFormatV3, compression.None, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil)
require.Nil(t, err)
- out, err := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3})
+ out, err := NewMemChunk(ChunkFormatV3, compression.None, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3})
require.Nil(t, err)
require.Equal(t, exp, out)
@@ -1181,8 +1181,8 @@ func TestCheckpointEncoding(t *testing.T) {
blockSize, targetSize := 256*1024, 1500*1024
for _, f := range allPossibleFormats {
- t.Run(testNameWithFormats(compression.EncSnappy, f.chunkFormat, f.headBlockFmt), func(t *testing.T) {
- c := newMemChunkWithFormat(f.chunkFormat, compression.EncSnappy, f.headBlockFmt, blockSize, targetSize)
+ t.Run(testNameWithFormats(compression.Snappy, f.chunkFormat, f.headBlockFmt), func(t *testing.T) {
+ c := newMemChunkWithFormat(f.chunkFormat, compression.Snappy, f.headBlockFmt, blockSize, targetSize)
// add a few entries
for i := 0; i < 5; i++ {
@@ -1267,7 +1267,7 @@ var (
func BenchmarkBufferedIteratorLabels(b *testing.B) {
for _, f := range HeadBlockFmts {
b.Run(f.String(), func(b *testing.B) {
- c := NewMemChunk(ChunkFormatV3, compression.EncSnappy, f, testBlockSize, testTargetSize)
+ c := NewMemChunk(ChunkFormatV3, compression.Snappy, f, testBlockSize, testTargetSize)
_ = fillChunk(c)
labelsSet := []labels.Labels{
@@ -1367,8 +1367,8 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) {
func Test_HeadIteratorReverse(t *testing.T) {
for _, testData := range allPossibleFormats {
- t.Run(testNameWithFormats(compression.EncSnappy, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) {
- c := newMemChunkWithFormat(testData.chunkFormat, compression.EncSnappy, testData.headBlockFmt, testBlockSize, testTargetSize)
+ t.Run(testNameWithFormats(compression.Snappy, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) {
+ c := newMemChunkWithFormat(testData.chunkFormat, compression.Snappy, testData.headBlockFmt, testBlockSize, testTargetSize)
genEntry := func(i int64) *logproto.Entry {
return &logproto.Entry{
Timestamp: time.Unix(0, i),
@@ -1483,7 +1483,7 @@ func TestMemChunk_Rebound(t *testing.T) {
}
func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk {
- chk := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
+ chk := NewMemChunk(ChunkFormatV3, compression.GZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
for ; from.Before(through); from = from.Add(time.Second) {
_, err := chk.Append(&logproto.Entry{
Line: from.String(),
@@ -1604,7 +1604,7 @@ func TestMemChunk_ReboundAndFilter_with_filter(t *testing.T) {
}
func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matchingFrom, matchingTo *time.Time, withStructuredMetadata bool) *MemChunk {
- chk := NewMemChunk(ChunkFormatV4, compression.EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
+ chk := NewMemChunk(ChunkFormatV4, compression.GZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
t.Logf("from : %v", from.String())
t.Logf("through: %v", through.String())
var structuredMetadata push.LabelsAdapter
@@ -1753,7 +1753,7 @@ func TestMemChunk_SpaceFor(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
for _, format := range allPossibleFormats {
t.Run(fmt.Sprintf("chunk_v%d_head_%s", format.chunkFormat, format.headBlockFmt), func(t *testing.T) {
- chk := newMemChunkWithFormat(format.chunkFormat, compression.EncNone, format.headBlockFmt, 1024, tc.targetSize)
+ chk := newMemChunkWithFormat(format.chunkFormat, compression.None, format.headBlockFmt, 1024, tc.targetSize)
chk.blocks = make([]block, tc.nBlocks)
chk.cutBlockSize = tc.cutBlockSize
@@ -2055,7 +2055,7 @@ func TestDecodeChunkIncorrectBlockOffset(t *testing.T) {
t.Run(fmt.Sprintf("chunkFormat:%v headBlockFmt:%v", format.chunkFormat, format.headBlockFmt), func(t *testing.T) {
for incorrectOffsetBlockNum := 0; incorrectOffsetBlockNum < 3; incorrectOffsetBlockNum++ {
t.Run(fmt.Sprintf("inorrect offset block: %d", incorrectOffsetBlockNum), func(t *testing.T) {
- chk := NewMemChunk(format.chunkFormat, compression.EncNone, format.headBlockFmt, blockSize, testTargetSize)
+ chk := NewMemChunk(format.chunkFormat, compression.None, format.headBlockFmt, blockSize, testTargetSize)
ts := time.Now().Unix()
for i := 0; i < 3; i++ {
dup, err := chk.Append(&logproto.Entry{
diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go
index fb341aaa8db93..509a34673fda0 100644
--- a/pkg/chunkenc/unordered_test.go
+++ b/pkg/chunkenc/unordered_test.go
@@ -451,7 +451,7 @@ func BenchmarkHeadBlockWrites(b *testing.B) {
}
func TestUnorderedChunkIterators(t *testing.T) {
- c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
+ c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for i := 0; i < 100; i++ {
// push in reverse order
dup, err := c.Append(&logproto.Entry{
@@ -497,11 +497,11 @@ func TestUnorderedChunkIterators(t *testing.T) {
}
func BenchmarkUnorderedRead(b *testing.B) {
- legacy := NewMemChunk(ChunkFormatV3, compression.EncSnappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize)
+ legacy := NewMemChunk(ChunkFormatV3, compression.Snappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkClose(legacy, false)
- ordered := NewMemChunk(ChunkFormatV3, compression.EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
+ ordered := NewMemChunk(ChunkFormatV3, compression.Snappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkClose(ordered, false)
- unordered := NewMemChunk(ChunkFormatV3, compression.EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
+ unordered := NewMemChunk(ChunkFormatV3, compression.Snappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkRandomOrder(unordered, false)
tcs := []struct {
@@ -559,7 +559,7 @@ func BenchmarkUnorderedRead(b *testing.B) {
}
func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
- c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
+ c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkRandomOrder(c, false)
ct := 0
@@ -596,7 +596,7 @@ func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
}
func chunkFrom(xs []logproto.Entry) ([]byte, error) {
- c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
+ c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for _, x := range xs {
if _, err := c.Append(&x); err != nil {
return nil, err
@@ -656,7 +656,7 @@ func TestReorder(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
- c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
+ c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for _, x := range tc.input {
dup, err := c.Append(&x)
require.False(t, dup)
@@ -675,7 +675,7 @@ func TestReorder(t *testing.T) {
}
func TestReorderAcrossBlocks(t *testing.T) {
- c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
+ c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for _, batch := range [][]int{
// ensure our blocks have overlapping bounds and must be reordered
// before closing.
diff --git a/pkg/chunkenc/util_test.go b/pkg/chunkenc/util_test.go
index 0d75273d6c81e..bcbe9cc1e8be0 100644
--- a/pkg/chunkenc/util_test.go
+++ b/pkg/chunkenc/util_test.go
@@ -24,7 +24,7 @@ func logprotoEntryWithStructuredMetadata(ts int64, line string, structuredMetada
}
}
-func generateData(enc compression.Encoding, chunksCount, blockSize, targetSize int) ([]Chunk, uint64) {
+func generateData(enc compression.Codec, chunksCount, blockSize, targetSize int) ([]Chunk, uint64) {
chunks := []Chunk{}
i := int64(0)
size := uint64(0)
diff --git a/pkg/compactor/deletion/delete_requests_table.go b/pkg/compactor/deletion/delete_requests_table.go
index 7d4c5cf4d254a..ed748097e5ba4 100644
--- a/pkg/compactor/deletion/delete_requests_table.go
+++ b/pkg/compactor/deletion/delete_requests_table.go
@@ -117,7 +117,7 @@ func (t *deleteRequestsTable) uploadFile() error {
}()
err = t.db.View(func(tx *bbolt.Tx) (err error) {
- gzipPool := compression.GetWriterPool(compression.EncGZIP)
+ gzipPool := compression.GetWriterPool(compression.GZIP)
compressedWriter := gzipPool.GetWriter(f)
defer gzipPool.PutWriter(compressedWriter)
diff --git a/pkg/compactor/index_set.go b/pkg/compactor/index_set.go
index 76b5546a96289..481d6aa399375 100644
--- a/pkg/compactor/index_set.go
+++ b/pkg/compactor/index_set.go
@@ -229,7 +229,7 @@ func (is *indexSet) upload() error {
}
}()
- gzipPool := compression.GetWriterPool(compression.EncGZIP)
+ gzipPool := compression.GetWriterPool(compression.GZIP)
compressedWriter := gzipPool.GetWriter(f)
defer gzipPool.PutWriter(compressedWriter)
diff --git a/pkg/compactor/retention/retention_test.go b/pkg/compactor/retention/retention_test.go
index 32dac3293a097..b68d9e39f42c8 100644
--- a/pkg/compactor/retention/retention_test.go
+++ b/pkg/compactor/retention/retention_test.go
@@ -279,7 +279,7 @@ func createChunk(t testing.TB, userID string, lbs labels.Labels, from model.Time
labelsBuilder.Set(labels.MetricName, "logs")
metric := labelsBuilder.Labels()
fp := ingesterclient.Fingerprint(lbs)
- chunkEnc := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, blockSize, targetSize)
+ chunkEnc := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, blockSize, targetSize)
for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) {
dup, err := chunkEnc.Append(&logproto.Entry{
diff --git a/pkg/compression/codec.go b/pkg/compression/codec.go
new file mode 100644
index 0000000000000..84038e50193a8
--- /dev/null
+++ b/pkg/compression/codec.go
@@ -0,0 +1,85 @@
+package compression
+
+import (
+ "fmt"
+ "strings"
+)
+
+// Codec identifies an available compression codec.
+type Codec byte
+
+// The different available codecs
+// Make sure to preserve the order, as the numeric values are serialized!
+//
+//nolint:revive
+const (
+ None Codec = iota
+ GZIP
+ Dumb // not supported
+ LZ4_64k
+ Snappy
+ LZ4_256k
+ LZ4_1M
+ LZ4_4M
+ Flate
+ Zstd
+)
+
+var supportedCodecs = []Codec{
+ None,
+ GZIP,
+ LZ4_64k,
+ Snappy,
+ LZ4_256k,
+ LZ4_1M,
+ LZ4_4M,
+ Flate,
+ Zstd,
+}
+
+func (e Codec) String() string {
+ switch e {
+ case GZIP:
+ return "gzip"
+ case None:
+ return "none"
+ case LZ4_64k:
+ return "lz4-64k"
+ case LZ4_256k:
+ return "lz4-256k"
+ case LZ4_1M:
+ return "lz4-1M"
+ case LZ4_4M:
+ return "lz4"
+ case Snappy:
+ return "snappy"
+ case Flate:
+ return "flate"
+ case Zstd:
+ return "zstd"
+ default:
+ return "unknown"
+ }
+}
+
+// ParseCodec parses a chunk encoding (compression codec) by its name.
+func ParseCodec(enc string) (Codec, error) {
+ for _, e := range supportedCodecs {
+ if strings.EqualFold(e.String(), enc) {
+ return e, nil
+ }
+ }
+ return 0, fmt.Errorf("invalid encoding: %s, supported: %s", enc, SupportedCodecs())
+}
+
+// SupportedCodecs returns the list of supported Encoding.
+func SupportedCodecs() string {
+ var sb strings.Builder
+ for i := range supportedCodecs {
+ sb.WriteString(supportedCodecs[i].String())
+ if i != len(supportedCodecs)-1 {
+ sb.WriteString(", ")
+ }
+ }
+ return sb.String()
+}
diff --git a/pkg/compression/encoding_test.go b/pkg/compression/codec_test.go
similarity index 84%
rename from pkg/compression/encoding_test.go
rename to pkg/compression/codec_test.go
index d67323ebb2d4f..7d25b53380d6b 100644
--- a/pkg/compression/encoding_test.go
+++ b/pkg/compression/codec_test.go
@@ -5,15 +5,15 @@ import "testing"
func TestParseEncoding(t *testing.T) {
tests := []struct {
enc string
- want Encoding
+ want Codec
wantErr bool
}{
- {"gzip", EncGZIP, false},
+ {"gzip", GZIP, false},
{"bad", 0, true},
}
for _, tt := range tests {
t.Run(tt.enc, func(t *testing.T) {
- got, err := ParseEncoding(tt.enc)
+ got, err := ParseCodec(tt.enc)
if (err != nil) != tt.wantErr {
t.Errorf("ParseEncoding() error = %v, wantErr %v", err, tt.wantErr)
return
diff --git a/pkg/compression/encoding.go b/pkg/compression/encoding.go
deleted file mode 100644
index ecef31f09325b..0000000000000
--- a/pkg/compression/encoding.go
+++ /dev/null
@@ -1,83 +0,0 @@
-package compression
-
-import (
- "fmt"
- "strings"
-)
-
-// Encoding identifies an available compression type.
-type Encoding byte
-
-// The different available encodings.
-// Make sure to preserve the order, as the numeric values are serialized!
-const (
- EncNone Encoding = iota
- EncGZIP
- EncDumb // not supported
- EncLZ4_64k
- EncSnappy
- EncLZ4_256k
- EncLZ4_1M
- EncLZ4_4M
- EncFlate
- EncZstd
-)
-
-var supportedEncoding = []Encoding{
- EncNone,
- EncGZIP,
- EncLZ4_64k,
- EncSnappy,
- EncLZ4_256k,
- EncLZ4_1M,
- EncLZ4_4M,
- EncFlate,
- EncZstd,
-}
-
-func (e Encoding) String() string {
- switch e {
- case EncGZIP:
- return "gzip"
- case EncNone:
- return "none"
- case EncLZ4_64k:
- return "lz4-64k"
- case EncLZ4_256k:
- return "lz4-256k"
- case EncLZ4_1M:
- return "lz4-1M"
- case EncLZ4_4M:
- return "lz4"
- case EncSnappy:
- return "snappy"
- case EncFlate:
- return "flate"
- case EncZstd:
- return "zstd"
- default:
- return "unknown"
- }
-}
-
-// ParseEncoding parses an chunk encoding (compression algorithm) by its name.
-func ParseEncoding(enc string) (Encoding, error) {
- for _, e := range supportedEncoding {
- if strings.EqualFold(e.String(), enc) {
- return e, nil
- }
- }
- return 0, fmt.Errorf("invalid encoding: %s, supported: %s", enc, SupportedEncoding())
-}
-
-// SupportedEncoding returns the list of supported Encoding.
-func SupportedEncoding() string {
- var sb strings.Builder
- for i := range supportedEncoding {
- sb.WriteString(supportedEncoding[i].String())
- if i != len(supportedEncoding)-1 {
- sb.WriteString(", ")
- }
- }
- return sb.String()
-}
diff --git a/pkg/compression/fileext.go b/pkg/compression/fileext.go
index 8cd09c392d082..d6cfa4431b72a 100644
--- a/pkg/compression/fileext.go
+++ b/pkg/compression/fileext.go
@@ -11,39 +11,39 @@ const (
ExtZstd = ".zst"
)
-func ToFileExtension(e Encoding) string {
+func ToFileExtension(e Codec) string {
switch e {
- case EncNone:
+ case None:
return ExtNone
- case EncGZIP:
+ case GZIP:
return ExtGZIP
- case EncLZ4_64k, EncLZ4_256k, EncLZ4_1M, EncLZ4_4M:
+ case LZ4_64k, LZ4_256k, LZ4_1M, LZ4_4M:
return ExtLZ4
- case EncSnappy:
+ case Snappy:
return ExtSnappy
- case EncFlate:
+ case Flate:
return ExtFlate
- case EncZstd:
+ case Zstd:
return ExtZstd
default:
- panic(fmt.Sprintf("invalid encoding: %d, supported: %s", e, SupportedEncoding()))
+ panic(fmt.Sprintf("invalid codec: %d, supported: %s", e, SupportedCodecs()))
}
}
-func FromFileExtension(ext string) Encoding {
+func FromFileExtension(ext string) Codec {
switch ext {
case ExtNone:
- return EncNone
+ return None
case ExtGZIP:
- return EncGZIP
+ return GZIP
case ExtLZ4:
- return EncLZ4_4M
+ return LZ4_4M
case ExtSnappy:
- return EncSnappy
+ return Snappy
case ExtFlate:
- return EncFlate
+ return Flate
case ExtZstd:
- return EncZstd
+ return Zstd
default:
panic(fmt.Sprintf("invalid file extension: %s", ext))
}
diff --git a/pkg/compression/pool.go b/pkg/compression/pool.go
index b68ff7de47b1c..5642875916512 100644
--- a/pkg/compression/pool.go
+++ b/pkg/compression/pool.go
@@ -51,33 +51,33 @@ var (
noop = NoopPool{}
)
-func GetWriterPool(enc Encoding) WriterPool {
+func GetWriterPool(enc Codec) WriterPool {
return GetPool(enc).(WriterPool)
}
-func GetReaderPool(enc Encoding) ReaderPool {
+func GetReaderPool(enc Codec) ReaderPool {
return GetPool(enc).(ReaderPool)
}
-func GetPool(enc Encoding) ReaderWriterPool {
+func GetPool(enc Codec) ReaderWriterPool {
switch enc {
- case EncGZIP:
+ case GZIP:
return &gzip
- case EncLZ4_64k:
+ case LZ4_64k:
return &lz4_64k
- case EncLZ4_256k:
+ case LZ4_256k:
return &lz4_256k
- case EncLZ4_1M:
+ case LZ4_1M:
return &lz4_1M
- case EncLZ4_4M:
+ case LZ4_4M:
return &lz4_4M
- case EncSnappy:
+ case Snappy:
return &snappy
- case EncNone:
+ case None:
return &noop
- case EncFlate:
+ case Flate:
return &flate
- case EncZstd:
+ case Zstd:
return &zstd
default:
panic("unknown encoding")
diff --git a/pkg/compression/pool_test.go b/pkg/compression/pool_test.go
index b39bbe0ad6f4d..fc5ba08a0d484 100644
--- a/pkg/compression/pool_test.go
+++ b/pkg/compression/pool_test.go
@@ -15,7 +15,7 @@ import (
)
func TestPool(t *testing.T) {
- for _, enc := range supportedEncoding {
+ for _, enc := range supportedCodecs {
enc := enc
t.Run(enc.String(), func(t *testing.T) {
var wg sync.WaitGroup
diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go
index 1639125390a07..9f1db601bb72d 100644
--- a/pkg/ingester/checkpoint_test.go
+++ b/pkg/ingester/checkpoint_test.go
@@ -566,7 +566,7 @@ func buildChunks(t testing.TB, size int) []Chunk {
for i := 0; i < size; i++ {
// build chunks of 256k blocks, 1.5MB target size. Same as default config.
- c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV3, compression.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 1500*1024)
+ c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV3, compression.GZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 1500*1024)
fillChunk(t, c)
descs = append(descs, chunkDesc{
chunk: c,
diff --git a/pkg/ingester/chunk_test.go b/pkg/ingester/chunk_test.go
index 961b256ea58c6..6d1f1735469fd 100644
--- a/pkg/ingester/chunk_test.go
+++ b/pkg/ingester/chunk_test.go
@@ -50,7 +50,7 @@ func TestIterator(t *testing.T) {
}{
{"dumbChunk", chunkenc.NewDumbChunk},
{"gzipChunk", func() chunkenc.Chunk {
- return chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
+ return chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.GZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
}},
} {
t.Run(chk.name, func(t *testing.T) {
diff --git a/pkg/ingester/encoding_test.go b/pkg/ingester/encoding_test.go
index ee2ad1d8f681a..3a730324e5c30 100644
--- a/pkg/ingester/encoding_test.go
+++ b/pkg/ingester/encoding_test.go
@@ -59,7 +59,7 @@ func Test_EncodingChunks(t *testing.T) {
t.Run(fmt.Sprintf("%v-%s", close, tc.desc), func(t *testing.T) {
conf := tc.conf
- c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize)
+ c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.GZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize)
fillChunk(t, c)
if close {
require.Nil(t, c.Close())
@@ -122,7 +122,7 @@ func Test_EncodingChunks(t *testing.T) {
func Test_EncodingCheckpoint(t *testing.T) {
conf := dummyConf()
- c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize)
+ c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.GZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize)
dup, err := c.Append(&logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "hi there",
diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go
index f01fb02e8730b..f4251747115a2 100644
--- a/pkg/ingester/flush_test.go
+++ b/pkg/ingester/flush_test.go
@@ -189,7 +189,7 @@ func buildChunkDecs(t testing.TB) []*chunkDesc {
for i := range res {
res[i] = &chunkDesc{
closed: true,
- chunk: chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize),
+ chunk: chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize),
}
fillChunk(t, res[i].chunk)
require.NoError(t, res[i].chunk.Close())
diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go
index 7776b9097f085..529336a58561c 100644
--- a/pkg/ingester/ingester.go
+++ b/pkg/ingester/ingester.go
@@ -89,18 +89,18 @@ var (
type Config struct {
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the ingester will operate and where it will register for discovery."`
- ConcurrentFlushes int `yaml:"concurrent_flushes"`
- FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
- FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"`
- FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
- RetainPeriod time.Duration `yaml:"chunk_retain_period"`
- MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
- BlockSize int `yaml:"chunk_block_size"`
- TargetChunkSize int `yaml:"chunk_target_size"`
- ChunkEncoding string `yaml:"chunk_encoding"`
- parsedEncoding compression.Encoding `yaml:"-"` // placeholder for validated encoding
- MaxChunkAge time.Duration `yaml:"max_chunk_age"`
- AutoForgetUnhealthy bool `yaml:"autoforget_unhealthy"`
+ ConcurrentFlushes int `yaml:"concurrent_flushes"`
+ FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
+ FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"`
+ FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
+ RetainPeriod time.Duration `yaml:"chunk_retain_period"`
+ MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
+ BlockSize int `yaml:"chunk_block_size"`
+ TargetChunkSize int `yaml:"chunk_target_size"`
+ ChunkEncoding string `yaml:"chunk_encoding"`
+ parsedEncoding compression.Codec `yaml:"-"` // placeholder for validated encoding
+ MaxChunkAge time.Duration `yaml:"max_chunk_age"`
+ AutoForgetUnhealthy bool `yaml:"autoforget_unhealthy"`
// Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments.
SyncPeriod time.Duration `yaml:"sync_period"`
@@ -150,7 +150,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.")
f.IntVar(&cfg.TargetChunkSize, "ingester.chunk-target-size", 1572864, "A target _compressed_ size in bytes for chunks. This is a desired size not an exact size, chunks may be slightly bigger or significantly smaller if they get flushed for other reasons (e.g. chunk_idle_period). A value of 0 creates chunks with a fixed 10 blocks, a non zero value will create chunks with a variable number of blocks to meet the target size.") // 1.5 MB
- f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", compression.EncGZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", compression.SupportedEncoding()))
+ f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", compression.GZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", compression.SupportedCodecs()))
f.DurationVar(&cfg.SyncPeriod, "ingester.sync-period", 1*time.Hour, "Parameters used to synchronize ingesters to cut chunks at the same moment. Sync period is used to roll over incoming entry to a new chunk. If chunk's utilization isn't high enough (eg. less than 50% when sync_min_utilization is set to 0.5), then this chunk rollover doesn't happen.")
f.Float64Var(&cfg.SyncMinUtilization, "ingester.sync-min-utilization", 0.1, "Minimum utilization of chunk when doing synchronization.")
f.IntVar(&cfg.MaxReturnedErrors, "ingester.max-ignored-stream-errors", 10, "The maximum number of errors a stream will report to the user when a push fails. 0 to make unlimited.")
@@ -164,7 +164,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
}
func (cfg *Config) Validate() error {
- enc, err := compression.ParseEncoding(cfg.ChunkEncoding)
+ enc, err := compression.ParseCodec(cfg.ChunkEncoding)
if err != nil {
return err
}
diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go
index a9108c52c2a14..9074580b4eb40 100644
--- a/pkg/ingester/ingester_test.go
+++ b/pkg/ingester/ingester_test.go
@@ -697,7 +697,7 @@ func TestValidate(t *testing.T) {
}{
{
in: Config{
- ChunkEncoding: compression.EncGZIP.String(),
+ ChunkEncoding: compression.GZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
@@ -708,7 +708,7 @@ func TestValidate(t *testing.T) {
MaxChunkAge: time.Minute,
},
expected: Config{
- ChunkEncoding: compression.EncGZIP.String(),
+ ChunkEncoding: compression.GZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
@@ -717,12 +717,12 @@ func TestValidate(t *testing.T) {
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
MaxChunkAge: time.Minute,
- parsedEncoding: compression.EncGZIP,
+ parsedEncoding: compression.GZIP,
},
},
{
in: Config{
- ChunkEncoding: compression.EncSnappy.String(),
+ ChunkEncoding: compression.Snappy.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
@@ -732,7 +732,7 @@ func TestValidate(t *testing.T) {
IndexShards: index.DefaultIndexShards,
},
expected: Config{
- ChunkEncoding: compression.EncSnappy.String(),
+ ChunkEncoding: compression.Snappy.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
@@ -740,7 +740,7 @@ func TestValidate(t *testing.T) {
},
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
- parsedEncoding: compression.EncSnappy,
+ parsedEncoding: compression.Snappy,
},
},
{
@@ -758,7 +758,7 @@ func TestValidate(t *testing.T) {
},
{
in: Config{
- ChunkEncoding: compression.EncGZIP.String(),
+ ChunkEncoding: compression.GZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
@@ -771,7 +771,7 @@ func TestValidate(t *testing.T) {
},
{
in: Config{
- ChunkEncoding: compression.EncGZIP.String(),
+ ChunkEncoding: compression.GZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
@@ -784,7 +784,7 @@ func TestValidate(t *testing.T) {
},
{
in: Config{
- ChunkEncoding: compression.EncGZIP.String(),
+ ChunkEncoding: compression.GZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go
index 9ac86fbd30155..03e0ca976628f 100644
--- a/pkg/ingester/stream_test.go
+++ b/pkg/ingester/stream_test.go
@@ -277,7 +277,7 @@ func TestStreamIterator(t *testing.T) {
{"gzipChunk", func() *chunkenc.MemChunk {
chunkfmt, headfmt := defaultChunkFormat(t)
- return chunkenc.NewMemChunk(chunkfmt, compression.EncGZIP, headfmt, 256*1024, 0)
+ return chunkenc.NewMemChunk(chunkfmt, compression.GZIP, headfmt, 256*1024, 0)
}},
} {
t.Run(chk.name, func(t *testing.T) {
diff --git a/pkg/storage/bloom/v1/archive.go b/pkg/storage/bloom/v1/archive.go
index fce83d69e41d9..a7b7232f230d1 100644
--- a/pkg/storage/bloom/v1/archive.go
+++ b/pkg/storage/bloom/v1/archive.go
@@ -21,7 +21,7 @@ type TarEntry struct {
Body io.ReadSeeker
}
-func TarCompress(enc compression.Encoding, dst io.Writer, reader BlockReader) error {
+func TarCompress(enc compression.Codec, dst io.Writer, reader BlockReader) error {
comprPool := compression.GetWriterPool(enc)
comprWriter := comprPool.GetWriter(dst)
defer func() {
@@ -61,7 +61,7 @@ func Tar(dst io.Writer, reader BlockReader) error {
return itr.Err()
}
-func UnTarCompress(enc compression.Encoding, dst string, r io.Reader) error {
+func UnTarCompress(enc compression.Codec, dst string, r io.Reader) error {
comprPool := compression.GetReaderPool(enc)
comprReader, err := comprPool.GetReader(r)
if err != nil {
diff --git a/pkg/storage/bloom/v1/archive_test.go b/pkg/storage/bloom/v1/archive_test.go
index b7857a4b5ed11..f91039cac3691 100644
--- a/pkg/storage/bloom/v1/archive_test.go
+++ b/pkg/storage/bloom/v1/archive_test.go
@@ -24,7 +24,7 @@ func TestArchive(t *testing.T) {
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
- encoding: compression.EncNone,
+ encoding: compression.None,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
@@ -82,17 +82,17 @@ func TestArchive(t *testing.T) {
func TestArchiveCompression(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
- enc compression.Encoding
+ enc compression.Codec
}{
- {compression.EncNone},
- {compression.EncGZIP},
- {compression.EncSnappy},
- {compression.EncLZ4_64k},
- {compression.EncLZ4_256k},
- {compression.EncLZ4_1M},
- {compression.EncLZ4_4M},
- {compression.EncFlate},
- {compression.EncZstd},
+ {compression.None},
+ {compression.GZIP},
+ {compression.Snappy},
+ {compression.LZ4_64k},
+ {compression.LZ4_256k},
+ {compression.LZ4_1M},
+ {compression.LZ4_4M},
+ {compression.Flate},
+ {compression.Zstd},
} {
t.Run(tc.enc.String(), func(t *testing.T) {
// for writing files to two dirs for comparison and ensuring they're equal
@@ -106,7 +106,7 @@ func TestArchiveCompression(t *testing.T) {
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
- encoding: compression.EncNone,
+ encoding: compression.None,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go
index b77af18d1aceb..82c85bd9f441b 100644
--- a/pkg/storage/bloom/v1/bloom.go
+++ b/pkg/storage/bloom/v1/bloom.go
@@ -316,7 +316,7 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc mempool.Allocator,
return nil, false, errors.Wrap(err, "seeking to bloom page")
}
- if b.schema.encoding == compression.EncNone {
+ if b.schema.encoding == compression.None {
res, err = LazyDecodeBloomPageNoCompression(r, alloc, page)
} else {
res, err = LazyDecodeBloomPage(r, alloc, b.schema.DecompressorPool(), page)
diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go
index 79eb74033dd74..f4c7ec7d831c4 100644
--- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go
+++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go
@@ -38,7 +38,7 @@ func TestTokenizerPopulate(t *testing.T) {
{Name: "pod", Value: "loki-1"},
{Name: "trace_id", Value: "3bef3c91643bde73"},
}
- memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
+ memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
_, _ = memChunk.Append(&push.Entry{
Timestamp: time.Unix(0, 1),
Line: testLine,
@@ -83,7 +83,7 @@ func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) {
{Name: "pod", Value: "loki-1"},
{Name: "trace_id", Value: "3bef3c91643bde73"},
}
- memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
+ memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
_, _ = memChunk.Append(&push.Entry{
Timestamp: time.Unix(0, 1),
Line: testLine,
@@ -120,7 +120,7 @@ func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) {
}
func chunkRefItrFromMetadata(metadata ...push.LabelsAdapter) (iter.EntryIterator, error) {
- memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
+ memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
for i, md := range metadata {
if _, err := memChunk.Append(&push.Entry{
Timestamp: time.Unix(0, int64(i)),
@@ -205,7 +205,7 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) {
sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8)
- memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
+ memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
_, _ = memChunk.Append(&push.Entry{
Timestamp: time.Unix(0, 1),
Line: "",
diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go
index 664eb60cd596f..f4bec3b2eaadb 100644
--- a/pkg/storage/bloom/v1/builder.go
+++ b/pkg/storage/bloom/v1/builder.go
@@ -66,7 +66,7 @@ func (b BlockOptions) Encode(enc *encoding.Encbuf) {
enc.PutBE64(b.BlockSize)
}
-func NewBlockOptions(enc compression.Encoding, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions {
+func NewBlockOptions(enc compression.Codec, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions {
opts := NewBlockOptionsFromSchema(Schema{
version: CurrentSchemaVersion,
encoding: enc,
diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go
index a2682921930f8..81c367df9c811 100644
--- a/pkg/storage/bloom/v1/builder_test.go
+++ b/pkg/storage/bloom/v1/builder_test.go
@@ -15,12 +15,12 @@ import (
"github.com/grafana/loki/v3/pkg/util/mempool"
)
-var blockEncodings = []compression.Encoding{
- compression.EncNone,
- compression.EncGZIP,
- compression.EncSnappy,
- compression.EncLZ4_256k,
- compression.EncZstd,
+var blockEncodings = []compression.Codec{
+ compression.None,
+ compression.GZIP,
+ compression.Snappy,
+ compression.LZ4_256k,
+ compression.Zstd,
}
func TestBlockOptions_RoundTrip(t *testing.T) {
@@ -28,7 +28,7 @@ func TestBlockOptions_RoundTrip(t *testing.T) {
opts := BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
- encoding: compression.EncSnappy,
+ encoding: compression.Snappy,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
@@ -201,7 +201,7 @@ func TestMergeBuilder(t *testing.T) {
blockOpts := BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
- encoding: compression.EncSnappy,
+ encoding: compression.Snappy,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
@@ -298,7 +298,7 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) {
blockOpts := BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
- encoding: compression.EncSnappy,
+ encoding: compression.Snappy,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
@@ -395,7 +395,7 @@ func TestBlockReset(t *testing.T) {
schema := Schema{
version: CurrentSchemaVersion,
- encoding: compression.EncSnappy,
+ encoding: compression.Snappy,
}
builder, err := NewBlockBuilder(
@@ -451,7 +451,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
blockOpts := BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
- encoding: compression.EncSnappy, // test with different encodings?
+ encoding: compression.Snappy, // test with different encodings?
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go
index ec4f575fc22a8..4a22b91e70099 100644
--- a/pkg/storage/bloom/v1/fuse_test.go
+++ b/pkg/storage/bloom/v1/fuse_test.go
@@ -60,7 +60,7 @@ func TestFusedQuerier(t *testing.T) {
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
- encoding: compression.EncSnappy,
+ encoding: compression.Snappy,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
@@ -147,7 +147,7 @@ func TestFusedQuerier_MultiPage(t *testing.T) {
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
- encoding: compression.EncSnappy,
+ encoding: compression.Snappy,
},
SeriesPageSize: 100,
BloomPageSize: 10, // So we force one bloom per page
@@ -296,7 +296,7 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
- encoding: compression.EncSnappy,
+ encoding: compression.Snappy,
},
SeriesPageSize: 100,
BloomPageSize: 10, // So we force one series per page
@@ -354,7 +354,7 @@ func TestFusedQuerier_SkipsEmptyBlooms(t *testing.T) {
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
- encoding: compression.EncNone,
+ encoding: compression.None,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
@@ -415,7 +415,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
- encoding: compression.EncSnappy,
+ encoding: compression.Snappy,
},
SeriesPageSize: 256 << 10, // 256k
BloomPageSize: 1 << 20, // 1MB
diff --git a/pkg/storage/bloom/v1/schema.go b/pkg/storage/bloom/v1/schema.go
index 954c96f757d6c..7c0271434b2b4 100644
--- a/pkg/storage/bloom/v1/schema.go
+++ b/pkg/storage/bloom/v1/schema.go
@@ -39,13 +39,13 @@ var (
type Schema struct {
version Version
- encoding compression.Encoding
+ encoding compression.Codec
}
func NewSchema() Schema {
return Schema{
version: CurrentSchemaVersion,
- encoding: compression.EncNone,
+ encoding: compression.None,
}
}
@@ -105,8 +105,8 @@ func (s *Schema) Decode(dec *encoding.Decbuf) error {
return errors.Errorf("invalid version. expected %d, got %d", 3, s.version)
}
- s.encoding = compression.Encoding(dec.Byte())
- if _, err := compression.ParseEncoding(s.encoding.String()); err != nil {
+ s.encoding = compression.Codec(dec.Byte())
+ if _, err := compression.ParseCodec(s.encoding.String()); err != nil {
return errors.Wrap(err, "parsing encoding")
}
diff --git a/pkg/storage/bloom/v1/test_util.go b/pkg/storage/bloom/v1/test_util.go
index e8997a8cc2419..4d036ba4809df 100644
--- a/pkg/storage/bloom/v1/test_util.go
+++ b/pkg/storage/bloom/v1/test_util.go
@@ -30,7 +30,7 @@ func MakeBlock(t testing.TB, nth int, fromFp, throughFp model.Fingerprint, fromT
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
- encoding: compression.EncSnappy,
+ encoding: compression.Snappy,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
diff --git a/pkg/storage/bloom/v1/versioned_builder_test.go b/pkg/storage/bloom/v1/versioned_builder_test.go
index 07240fe603586..9154daf77fc77 100644
--- a/pkg/storage/bloom/v1/versioned_builder_test.go
+++ b/pkg/storage/bloom/v1/versioned_builder_test.go
@@ -14,7 +14,7 @@ import (
// smallBlockOpts returns a set of block options that are suitable for testing
// characterized by small page sizes
-func smallBlockOpts(v Version, enc compression.Encoding) BlockOptions {
+func smallBlockOpts(v Version, enc compression.Codec) BlockOptions {
return BlockOptions{
Schema: Schema{
version: v,
@@ -33,7 +33,7 @@ func setup(v Version) (BlockOptions, []SeriesWithBlooms, BlockWriter, BlockReade
bloomsBuf := bytes.NewBuffer(nil)
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := NewByteReader(indexBuf, bloomsBuf)
- return smallBlockOpts(v, compression.EncNone), data, writer, reader
+ return smallBlockOpts(v, compression.None), data, writer, reader
}
func TestV3Roundtrip(t *testing.T) {
diff --git a/pkg/storage/chunk/cache/cache_test.go b/pkg/storage/chunk/cache/cache_test.go
index 2f236c1f40e48..3ff473934cdb1 100644
--- a/pkg/storage/chunk/cache/cache_test.go
+++ b/pkg/storage/chunk/cache/cache_test.go
@@ -35,7 +35,7 @@ func fillCache(t *testing.T, scfg config.SchemaConfig, cache cache.Cache) ([]str
for i := 0; i < 111; i++ {
ts := model.TimeFromUnix(int64(i * chunkLen))
- cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
+ cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.GZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
_, err := cs.Append(&logproto.Entry{
Timestamp: ts.Time(),
diff --git a/pkg/storage/chunk/client/grpc/grpc_client_test.go b/pkg/storage/chunk/client/grpc/grpc_client_test.go
index d40d825a94428..2c33c29c15b71 100644
--- a/pkg/storage/chunk/client/grpc/grpc_client_test.go
+++ b/pkg/storage/chunk/client/grpc/grpc_client_test.go
@@ -82,7 +82,7 @@ func TestGrpcStore(t *testing.T) {
newChunkData := func() chunk.Data {
return chunkenc.NewFacade(
chunkenc.NewMemChunk(
- chunkenc.ChunkFormatV3, compression.EncNone, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0,
+ chunkenc.ChunkFormatV3, compression.None, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0,
), 0, 0)
}
diff --git a/pkg/storage/chunk/client/testutils/testutils.go b/pkg/storage/chunk/client/testutils/testutils.go
index e436c1335f212..ad0e0a8de2e6f 100644
--- a/pkg/storage/chunk/client/testutils/testutils.go
+++ b/pkg/storage/chunk/client/testutils/testutils.go
@@ -87,7 +87,7 @@ func CreateChunks(scfg config.SchemaConfig, startIndex, batchSize int, from mode
}
func DummyChunkFor(from, through model.Time, metric labels.Labels) chunk.Chunk {
- cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
+ cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.GZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
for ts := from; ts <= through; ts = ts.Add(15 * time.Second) {
_, err := cs.Append(&logproto.Entry{Timestamp: ts.Time(), Line: fmt.Sprintf("line ts=%d", ts)})
diff --git a/pkg/storage/chunk/fetcher/fetcher_test.go b/pkg/storage/chunk/fetcher/fetcher_test.go
index 58123957919bd..27fc5a124e464 100644
--- a/pkg/storage/chunk/fetcher/fetcher_test.go
+++ b/pkg/storage/chunk/fetcher/fetcher_test.go
@@ -312,7 +312,7 @@ func makeChunks(now time.Time, tpls ...c) []chunk.Chunk {
from := int(chk.from) / int(time.Hour)
// This is only here because it's helpful for debugging.
// This isn't even the write format for Loki but we dont' care for the sake of these tests.
- memChk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncNone, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
+ memChk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.None, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
// To make sure the fetcher doesn't swap keys and buffers each chunk is built with different, but deterministic data
for i := 0; i < from; i++ {
_, _ = memChk.Append(&logproto.Entry{
diff --git a/pkg/storage/hack/main.go b/pkg/storage/hack/main.go
index b2d01d2e41e07..4e6c348ceb3e0 100644
--- a/pkg/storage/hack/main.go
+++ b/pkg/storage/hack/main.go
@@ -104,7 +104,7 @@ func fillStore(cm storage.ClientMetrics) error {
labelsBuilder.Set(labels.MetricName, "logs")
metric := labelsBuilder.Labels()
fp := client.Fingerprint(lbs)
- chunkEnc := chunkenc.NewMemChunk(chunkfmt, compression.EncLZ4_4M, headfmt, 262144, 1572864)
+ chunkEnc := chunkenc.NewMemChunk(chunkfmt, compression.LZ4_4M, headfmt, 262144, 1572864)
for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() {
entry := &logproto.Entry{
Timestamp: time.Unix(0, ts),
@@ -127,7 +127,7 @@ func fillStore(cm storage.ClientMetrics) error {
if flushCount >= maxChunks {
return
}
- chunkEnc = chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncLZ4_64k, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 262144, 1572864)
+ chunkEnc = chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.LZ4_64k, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 262144, 1572864)
}
}
}(i)
diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go
index b1493089750a9..197fcfa6f1e90 100644
--- a/pkg/storage/store_test.go
+++ b/pkg/storage/store_test.go
@@ -2037,7 +2037,7 @@ func TestQueryReferencingStructuredMetadata(t *testing.T) {
metric := labelsBuilder.Labels()
fp := client.Fingerprint(lbs)
- chunkEnc := chunkenc.NewMemChunk(chunkfmt, compression.EncLZ4_4M, headfmt, 262144, 1572864)
+ chunkEnc := chunkenc.NewMemChunk(chunkfmt, compression.LZ4_4M, headfmt, 262144, 1572864)
for ts := chkFrom; !ts.After(chkThrough); ts = ts.Add(time.Second) {
entry := logproto.Entry{
Timestamp: ts,
diff --git a/pkg/storage/stores/series/series_store_test.go b/pkg/storage/stores/series/series_store_test.go
index 3bd136cb3b619..d64fd70b25b59 100644
--- a/pkg/storage/stores/series/series_store_test.go
+++ b/pkg/storage/stores/series/series_store_test.go
@@ -753,7 +753,7 @@ func dummyChunkWithFormat(t testing.TB, now model.Time, metric labels.Labels, fo
samples := 1
chunkStart := now.Add(-time.Hour)
- chk := chunkenc.NewMemChunk(format, compression.EncGZIP, headfmt, 256*1024, 0)
+ chk := chunkenc.NewMemChunk(format, compression.GZIP, headfmt, 256*1024, 0)
for i := 0; i < samples; i++ {
ts := time.Duration(i) * 15 * time.Second
dup, err := chk.Append(&logproto.Entry{Timestamp: chunkStart.Time().Add(ts), Line: fmt.Sprintf("line %d", i)})
diff --git a/pkg/storage/stores/series_store_write_test.go b/pkg/storage/stores/series_store_write_test.go
index a24608675a3d0..5ff8a00d99706 100644
--- a/pkg/storage/stores/series_store_write_test.go
+++ b/pkg/storage/stores/series_store_write_test.go
@@ -93,7 +93,7 @@ func TestChunkWriter_PutOne(t *testing.T) {
chunkfmt, headfmt, err := periodConfig.ChunkFormat()
require.NoError(t, err)
- memchk := chunkenc.NewMemChunk(chunkfmt, compression.EncGZIP, headfmt, 256*1024, 0)
+ memchk := chunkenc.NewMemChunk(chunkfmt, compression.GZIP, headfmt, 256*1024, 0)
chk := chunk.NewChunk("fake", model.Fingerprint(0), []labels.Label{{Name: "foo", Value: "bar"}}, chunkenc.NewFacade(memchk, 0, 0), 100, 400)
for name, tc := range map[string]struct {
diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go
index 1390b0d9c52e8..1c66e500a6b9c 100644
--- a/pkg/storage/stores/shipper/bloomshipper/client.go
+++ b/pkg/storage/stores/shipper/bloomshipper/client.go
@@ -73,7 +73,7 @@ func (r Ref) Interval() Interval {
type BlockRef struct {
Ref
- compression.Encoding
+ compression.Codec
}
func (r BlockRef) String() string {
@@ -220,17 +220,17 @@ func newRefFrom(tenant, table string, md v1.BlockMetadata) Ref {
}
}
-func newBlockRefWithEncoding(ref Ref, enc compression.Encoding) BlockRef {
- return BlockRef{Ref: ref, Encoding: enc}
+func newBlockRefWithEncoding(ref Ref, enc compression.Codec) BlockRef {
+ return BlockRef{Ref: ref, Codec: enc}
}
-func BlockFrom(enc compression.Encoding, tenant, table string, blk *v1.Block) (Block, error) {
+func BlockFrom(enc compression.Codec, tenant, table string, blk *v1.Block) (Block, error) {
md, _ := blk.Metadata()
ref := newBlockRefWithEncoding(newRefFrom(tenant, table, md), enc)
// TODO(owen-d): pool
buf := bytes.NewBuffer(nil)
- err := v1.TarCompress(ref.Encoding, buf, blk.Reader())
+ err := v1.TarCompress(ref.Codec, buf, blk.Reader())
if err != nil {
return Block{}, err
@@ -330,7 +330,7 @@ func (b *BloomClient) GetBlock(ctx context.Context, ref BlockRef) (BlockDirector
return BlockDirectory{}, fmt.Errorf("failed to create block directory %s: %w", path, err)
}
- err = v1.UnTarCompress(ref.Encoding, path, rc)
+ err = v1.UnTarCompress(ref.Codec, path, rc)
if err != nil {
return BlockDirectory{}, fmt.Errorf("failed to extract block file %s: %w", key, err)
}
diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go
index 13ce7a7c97ae6..04897e897ff67 100644
--- a/pkg/storage/stores/shipper/bloomshipper/client_test.go
+++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go
@@ -21,16 +21,16 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
)
-var supportedCompressions = []compression.Encoding{
- compression.EncNone,
- compression.EncGZIP,
- compression.EncSnappy,
- compression.EncLZ4_64k,
- compression.EncLZ4_256k,
- compression.EncLZ4_1M,
- compression.EncLZ4_4M,
- compression.EncFlate,
- compression.EncZstd,
+var supportedCompressions = []compression.Codec{
+ compression.None,
+ compression.GZIP,
+ compression.Snappy,
+ compression.LZ4_64k,
+ compression.LZ4_256k,
+ compression.LZ4_1M,
+ compression.LZ4_4M,
+ compression.Flate,
+ compression.Zstd,
}
func parseTime(s string) model.Time {
@@ -209,7 +209,7 @@ func TestBloomClient_DeleteMetas(t *testing.T) {
})
}
-func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, minFp, maxFp model.Fingerprint, enc compression.Encoding) (Block, error) {
+func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, minFp, maxFp model.Fingerprint, enc compression.Codec) (Block, error) {
step := int64((24 * time.Hour).Seconds())
day := start.Unix() / step
@@ -234,7 +234,7 @@ func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, min
StartTimestamp: start,
EndTimestamp: start.Add(12 * time.Hour),
},
- Encoding: enc,
+ Codec: enc,
},
Data: fp,
}
@@ -273,9 +273,9 @@ func TestBloomClient_GetBlocks(t *testing.T) {
c, _ := newMockBloomClient(t)
ctx := context.Background()
- b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0x0fff, compression.EncGZIP)
+ b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0x0fff, compression.GZIP)
require.NoError(t, err)
- b2, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x1000, 0xffff, compression.EncNone)
+ b2, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x1000, 0xffff, compression.None)
require.NoError(t, err)
t.Run("exists", func(t *testing.T) {
@@ -318,7 +318,7 @@ func TestBloomClient_PutBlock(t *testing.T) {
StartTimestamp: start,
EndTimestamp: start.Add(12 * time.Hour),
},
- Encoding: enc,
+ Codec: enc,
},
Data: fp,
}
@@ -343,11 +343,11 @@ func TestBloomClient_DeleteBlocks(t *testing.T) {
c, _ := newMockBloomClient(t)
ctx := context.Background()
- b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff, compression.EncNone)
+ b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff, compression.None)
require.NoError(t, err)
- b2, err := putBlock(t, c, "tenant", parseTime("2024-02-06 00:00"), 0x0000, 0xffff, compression.EncGZIP)
+ b2, err := putBlock(t, c, "tenant", parseTime("2024-02-06 00:00"), 0x0000, 0xffff, compression.GZIP)
require.NoError(t, err)
- b3, err := putBlock(t, c, "tenant", parseTime("2024-02-07 00:00"), 0x0000, 0xffff, compression.EncSnappy)
+ b3, err := putBlock(t, c, "tenant", parseTime("2024-02-07 00:00"), 0x0000, 0xffff, compression.Snappy)
require.NoError(t, err)
oc := c.client.(*testutils.InMemoryObjectClient)
diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go
index 9361c35e90ebd..6c60c64b5f2df 100644
--- a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go
+++ b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go
@@ -329,11 +329,11 @@ func TestFetcher_LoadBlocksFromFS(t *testing.T) {
refs := []BlockRef{
// no directory for block
- {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x0000, 0x0fff)}, Encoding: compression.EncNone},
+ {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x0000, 0x0fff)}, Codec: compression.None},
// invalid directory for block
- {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x1000, 0x1fff)}, Encoding: compression.EncSnappy},
+ {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x1000, 0x1fff)}, Codec: compression.Snappy},
// valid directory for block
- {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x2000, 0x2fff)}, Encoding: compression.EncGZIP},
+ {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x2000, 0x2fff)}, Codec: compression.GZIP},
}
dirs := []string{
localFilePathWithoutExtension(refs[0], resolver),
diff --git a/pkg/storage/stores/shipper/bloomshipper/resolver.go b/pkg/storage/stores/shipper/bloomshipper/resolver.go
index 3115f731fe13f..f101b55896a1a 100644
--- a/pkg/storage/stores/shipper/bloomshipper/resolver.go
+++ b/pkg/storage/stores/shipper/bloomshipper/resolver.go
@@ -81,7 +81,7 @@ func (defaultKeyResolver) ParseMetaKey(loc Location) (MetaRef, error) {
}
func (defaultKeyResolver) Block(ref BlockRef) Location {
- ext := blockExtension + compression.ToFileExtension(ref.Encoding)
+ ext := blockExtension + compression.ToFileExtension(ref.Codec)
return simpleLocation{
BloomPrefix,
fmt.Sprintf("%v", ref.TableName),
@@ -95,7 +95,7 @@ func (defaultKeyResolver) Block(ref BlockRef) Location {
func (defaultKeyResolver) ParseBlockKey(loc Location) (BlockRef, error) {
dir, fn := path.Split(loc.Addr())
- ext, enc := path.Ext(fn), compression.EncNone
+ ext, enc := path.Ext(fn), compression.None
if ext != "" && ext != blockExtension {
// trim compression extension
fn = strings.TrimSuffix(fn, ext)
@@ -142,7 +142,7 @@ func (defaultKeyResolver) ParseBlockKey(loc Location) (BlockRef, error) {
EndTimestamp: interval.End,
Checksum: uint32(checksum),
},
- Encoding: enc,
+ Codec: enc,
}, nil
}
@@ -286,9 +286,9 @@ func (ls locations) LocalPath() string {
}
func cacheKey(ref BlockRef) string {
- return strings.TrimSuffix(defaultKeyResolver{}.Block(ref).Addr(), blockExtension+compression.ToFileExtension(ref.Encoding))
+ return strings.TrimSuffix(defaultKeyResolver{}.Block(ref).Addr(), blockExtension+compression.ToFileExtension(ref.Codec))
}
func localFilePathWithoutExtension(ref BlockRef, res KeyResolver) string {
- return strings.TrimSuffix(res.Block(ref).LocalPath(), blockExtension+compression.ToFileExtension(ref.Encoding))
+ return strings.TrimSuffix(res.Block(ref).LocalPath(), blockExtension+compression.ToFileExtension(ref.Codec))
}
diff --git a/pkg/storage/stores/shipper/bloomshipper/resolver_test.go b/pkg/storage/stores/shipper/bloomshipper/resolver_test.go
index 259bf7b2db3a3..21da2e4e69091 100644
--- a/pkg/storage/stores/shipper/bloomshipper/resolver_test.go
+++ b/pkg/storage/stores/shipper/bloomshipper/resolver_test.go
@@ -33,17 +33,17 @@ func TestResolver_ParseMetaKey(t *testing.T) {
func TestResolver_ParseBlockKey(t *testing.T) {
for _, tc := range []struct {
- srcEnc, dstEnc compression.Encoding
+ srcEnc, dstEnc compression.Codec
}{
- {compression.EncNone, compression.EncNone},
- {compression.EncGZIP, compression.EncGZIP},
- {compression.EncSnappy, compression.EncSnappy},
- {compression.EncLZ4_64k, compression.EncLZ4_4M},
- {compression.EncLZ4_256k, compression.EncLZ4_4M},
- {compression.EncLZ4_1M, compression.EncLZ4_4M},
- {compression.EncLZ4_4M, compression.EncLZ4_4M},
- {compression.EncFlate, compression.EncFlate},
- {compression.EncZstd, compression.EncZstd},
+ {compression.None, compression.None},
+ {compression.GZIP, compression.GZIP},
+ {compression.Snappy, compression.Snappy},
+ {compression.LZ4_64k, compression.LZ4_4M},
+ {compression.LZ4_256k, compression.LZ4_4M},
+ {compression.LZ4_1M, compression.LZ4_4M},
+ {compression.LZ4_4M, compression.LZ4_4M},
+ {compression.Flate, compression.Flate},
+ {compression.Zstd, compression.Zstd},
} {
t.Run(tc.srcEnc.String(), func(t *testing.T) {
r := defaultKeyResolver{}
@@ -56,7 +56,7 @@ func TestResolver_ParseBlockKey(t *testing.T) {
EndTimestamp: 3600000,
Checksum: 43981,
},
- Encoding: tc.srcEnc,
+ Codec: tc.srcEnc,
}
// encode block ref as string
@@ -69,8 +69,8 @@ func TestResolver_ParseBlockKey(t *testing.T) {
parsed, err := r.ParseBlockKey(key(path))
require.NoError(t, err)
expected := BlockRef{
- Ref: ref.Ref,
- Encoding: tc.dstEnc,
+ Ref: ref.Ref,
+ Codec: tc.dstEnc,
}
require.Equal(t, expected, parsed)
})
diff --git a/pkg/storage/stores/shipper/bloomshipper/store_test.go b/pkg/storage/stores/shipper/bloomshipper/store_test.go
index 674e0c02a506b..92adcf9249226 100644
--- a/pkg/storage/stores/shipper/bloomshipper/store_test.go
+++ b/pkg/storage/stores/shipper/bloomshipper/store_test.go
@@ -116,7 +116,7 @@ func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start
err := blockWriter.Init()
require.NoError(t, err)
- enc := compression.EncGZIP
+ enc := compression.GZIP
err = v1.TarCompress(enc, fp, v1.NewDirectoryBlockReader(tmpDir))
require.NoError(t, err)
@@ -130,7 +130,7 @@ func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start
StartTimestamp: start,
EndTimestamp: start.Add(12 * time.Hour),
},
- Encoding: enc,
+ Codec: enc,
},
Data: fp,
}
diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go
index a7ea7af3b05ef..04948c38a17c1 100644
--- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go
+++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go
@@ -32,7 +32,7 @@ func createChunk(t testing.TB, chunkFormat byte, headBlockFmt chunkenc.HeadBlock
labelsBuilder.Set(labels.MetricName, "logs")
metric := labelsBuilder.Labels()
fp := ingesterclient.Fingerprint(lbs)
- chunkEnc := chunkenc.NewMemChunk(chunkFormat, compression.EncSnappy, headBlockFmt, blockSize, targetSize)
+ chunkEnc := chunkenc.NewMemChunk(chunkFormat, compression.Snappy, headBlockFmt, blockSize, targetSize)
for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) {
dup, err := chunkEnc.Append(&logproto.Entry{
diff --git a/pkg/storage/stores/shipper/indexshipper/uploads/index_set.go b/pkg/storage/stores/shipper/indexshipper/uploads/index_set.go
index 36dc138509564..d0d04a5104a5f 100644
--- a/pkg/storage/stores/shipper/indexshipper/uploads/index_set.go
+++ b/pkg/storage/stores/shipper/indexshipper/uploads/index_set.go
@@ -145,7 +145,7 @@ func (t *indexSet) uploadIndex(ctx context.Context, idx index.Index) error {
}
}()
- gzipPool := compression.GetWriterPool(compression.EncGZIP)
+ gzipPool := compression.GetWriterPool(compression.GZIP)
compressedWriter := gzipPool.GetWriter(f)
defer gzipPool.PutWriter(compressedWriter)
diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go
index dd535197afb3f..a0dc75999692f 100644
--- a/pkg/storage/util_test.go
+++ b/pkg/storage/util_test.go
@@ -109,7 +109,7 @@ func newChunk(chunkFormat byte, headBlockFmt chunkenc.HeadBlockFmt, stream logpr
lbs = builder.Labels()
}
from, through := loki_util.RoundToMilliseconds(stream.Entries[0].Timestamp, stream.Entries[len(stream.Entries)-1].Timestamp)
- chk := chunkenc.NewMemChunk(chunkFormat, compression.EncGZIP, headBlockFmt, 256*1024, 0)
+ chk := chunkenc.NewMemChunk(chunkFormat, compression.GZIP, headBlockFmt, 256*1024, 0)
for _, e := range stream.Entries {
_, _ = chk.Append(&e)
}
diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go
index 153073b74e6c2..9b362f20704f4 100644
--- a/pkg/validation/limits.go
+++ b/pkg/validation/limits.go
@@ -490,7 +490,7 @@ func (l *Limits) Validate() error {
return errors.Wrap(err, "invalid tsdb sharding strategy")
}
- if _, err := compression.ParseEncoding(l.BloomBlockEncoding); err != nil {
+ if _, err := compression.ParseCodec(l.BloomBlockEncoding); err != nil {
return err
}
diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go
index 87fab6837029c..19278c77a342f 100644
--- a/pkg/validation/limits_test.go
+++ b/pkg/validation/limits_test.go
@@ -339,7 +339,7 @@ func TestLimitsValidation(t *testing.T) {
},
{
limits: Limits{DeletionMode: "disabled", BloomBlockEncoding: "unknown"},
- expected: fmt.Errorf("invalid encoding: unknown, supported: %s", compression.SupportedEncoding()),
+ expected: fmt.Errorf("invalid encoding: unknown, supported: %s", compression.SupportedCodecs()),
},
} {
desc := fmt.Sprintf("%s/%s", tc.limits.DeletionMode, tc.limits.BloomBlockEncoding)
diff --git a/production/helm/loki/CHANGELOG.md b/production/helm/loki/CHANGELOG.md
index ded3ea53196e7..eb12c55c33fc8 100644
--- a/production/helm/loki/CHANGELOG.md
+++ b/production/helm/loki/CHANGELOG.md
@@ -13,6 +13,10 @@ Entries should include a reference to the pull request that introduced the chang
[//]: # ( : do not remove this line. This locator is used by the CI pipeline to automatically create a changelog entry for each new Loki release. Add other chart versions and respective changelog entries bellow this line.)
+## 6.16.0
+
+- [ENHANCEMENT] Allow setting nodeSelector, tolerations and affinity to enterprise components (tokengen and provisioner).
+
## 6.15.0
- [ENHANCEMENT] Allow setting annotations for memberlist and query-scheduler-discovery services
diff --git a/production/helm/loki/Chart.yaml b/production/helm/loki/Chart.yaml
index 56e48a535e070..69baa3b24ea06 100644
--- a/production/helm/loki/Chart.yaml
+++ b/production/helm/loki/Chart.yaml
@@ -3,7 +3,7 @@ name: loki
description: Helm chart for Grafana Loki and Grafana Enterprise Logs supporting both simple, scalable and distributed modes.
type: application
appVersion: 3.1.1
-version: 6.15.0
+version: 6.16.0
home: https://grafana.github.io/helm-charts
sources:
- https://github.com/grafana/loki
diff --git a/production/helm/loki/README.md b/production/helm/loki/README.md
index 92008e768d6bc..235c31643d103 100644
--- a/production/helm/loki/README.md
+++ b/production/helm/loki/README.md
@@ -1,6 +1,6 @@
# loki
-![Version: 6.15.0](https://img.shields.io/badge/Version-6.15.0-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 3.1.1](https://img.shields.io/badge/AppVersion-3.1.1-informational?style=flat-square)
+![Version: 6.16.0](https://img.shields.io/badge/Version-6.16.0-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 3.1.1](https://img.shields.io/badge/AppVersion-3.1.1-informational?style=flat-square)
Helm chart for Grafana Loki and Grafana Enterprise Logs supporting both simple, scalable and distributed modes.
diff --git a/production/helm/loki/templates/provisioner/job-provisioner.yaml b/production/helm/loki/templates/provisioner/job-provisioner.yaml
index 61fd2be8501ba..5a6bc063715f4 100644
--- a/production/helm/loki/templates/provisioner/job-provisioner.yaml
+++ b/production/helm/loki/templates/provisioner/job-provisioner.yaml
@@ -123,6 +123,18 @@ spec:
{{- end }}
- name: bootstrap
mountPath: /bootstrap
+ {{- with .Values.enterprise.provisioner.affinity }}
+ affinity:
+ {{- toYaml . | nindent 8 }}
+ {{- end }}
+ {{- with .Values.enterprise.provisioner.nodeSelector }}
+ nodeSelector:
+ {{- toYaml . | nindent 8 }}
+ {{- end }}
+ {{- with .Values.enterprise.provisioner.tolerations }}
+ tolerations:
+ {{- toYaml . | nindent 8 }}
+ {{- end }}
restartPolicy: OnFailure
serviceAccount: {{ include "enterprise-logs.provisionerFullname" . }}
serviceAccountName: {{ include "enterprise-logs.provisionerFullname" . }}
diff --git a/production/helm/loki/templates/tokengen/job-tokengen.yaml b/production/helm/loki/templates/tokengen/job-tokengen.yaml
index f9ae7374c2d17..b0950d6f19675 100644
--- a/production/helm/loki/templates/tokengen/job-tokengen.yaml
+++ b/production/helm/loki/templates/tokengen/job-tokengen.yaml
@@ -110,6 +110,14 @@ spec:
restartPolicy: OnFailure
serviceAccount: {{ template "enterprise-logs.tokengenFullname" . }}
serviceAccountName: {{ template "enterprise-logs.tokengenFullname" . }}
+ {{- with .Values.enterprise.tokengen.affinity }}
+ affinity:
+ {{- toYaml . | nindent 8 }}
+ {{- end }}
+ {{- with .Values.enterprise.tokengen.nodeSelector }}
+ nodeSelector:
+ {{- toYaml . | nindent 8 }}
+ {{- end }}
{{- with .Values.enterprise.tokengen.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
diff --git a/production/helm/loki/values.yaml b/production/helm/loki/values.yaml
index ada05d7456068..4f2f26174986f 100644
--- a/production/helm/loki/values.yaml
+++ b/production/helm/loki/values.yaml
@@ -540,6 +540,10 @@ enterprise:
labels: {}
# -- Additional annotations for the `tokengen` Job
annotations: {}
+ # -- Affinity for tokengen Pods
+ affinity: {}
+ # -- Node selector for tokengen Pods
+ nodeSelector: {}
# -- Tolerations for tokengen Job
tolerations: []
# -- Additional volumes for Pods
@@ -575,6 +579,12 @@ enterprise:
labels: {}
# -- Additional annotations for the `provisioner` Job
annotations: {}
+ # -- Affinity for tokengen Pods
+ affinity: {}
+ # -- Node selector for tokengen Pods
+ nodeSelector: {}
+ # -- Tolerations for tokengen Pods
+ tolerations: []
# -- The name of the PriorityClass for provisioner Job
priorityClassName: null
# -- Run containers as user `enterprise-logs(uid=10001)`
diff --git a/tools/tsdb/migrate-versions/main.go b/tools/tsdb/migrate-versions/main.go
index e4fb39e69a4fa..d3853442b6e86 100644
--- a/tools/tsdb/migrate-versions/main.go
+++ b/tools/tsdb/migrate-versions/main.go
@@ -257,7 +257,7 @@ func uploadFile(idx shipperindex.Index, indexStorageClient shipperstorage.Client
}
}()
- gzipPool := compression.GetWriterPool(compression.EncGZIP)
+ gzipPool := compression.GetWriterPool(compression.GZIP)
compressedWriter := gzipPool.GetWriter(f)
defer gzipPool.PutWriter(compressedWriter)