Skip to content

Commit

Permalink
refactor: do not force chunk_retain_period value for TSDB index type (#…
Browse files Browse the repository at this point in the history
…12475)

Signed-off-by: Edward Welch <[email protected]>
  • Loading branch information
slim-bean authored Apr 4, 2024
1 parent 9c841a7 commit 4117523
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 9 deletions.
11 changes: 7 additions & 4 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ func applyPathPrefixDefaults(r, defaults *ConfigWrapper) {
}
if len(r.StorageConfig.BloomShipperConfig.WorkingDirectory) == 1 &&
len(r.StorageConfig.BloomShipperConfig.WorkingDirectory) == len(defaults.StorageConfig.BloomShipperConfig.WorkingDirectory) &&

r.StorageConfig.BloomShipperConfig.WorkingDirectory[0] == defaults.StorageConfig.BloomShipperConfig.WorkingDirectory[0] {
_ = r.StorageConfig.BloomShipperConfig.WorkingDirectory.Set(fmt.Sprintf("%s/blooms", prefix))
}
Expand Down Expand Up @@ -676,8 +675,12 @@ func applyIngesterReplicationFactor(cfg *ConfigWrapper) {
// for at least as long as the TTL on the index queries cache.
func applyChunkRetain(cfg, defaults *ConfigWrapper) {
if !reflect.DeepEqual(cfg.StorageConfig.IndexQueriesCacheConfig, defaults.StorageConfig.IndexQueriesCacheConfig) {
// Set the retain period to the cache validity plus one minute. One minute is arbitrary but leaves some
// buffer to make sure the chunks are there until the index entries expire.
cfg.Ingester.RetainPeriod = cfg.StorageConfig.IndexCacheValidity + 1*time.Minute
// Only apply this change if the active index period is for boltdb-shipper
p := config.ActivePeriodConfig(cfg.SchemaConfig.Configs)
if cfg.SchemaConfig.Configs[p].IndexType == config.BoltDBShipperType {
// Set the retain period to the cache validity plus one minute. One minute is arbitrary but leaves some
// buffer to make sure the chunks are there until the index entries expire.
cfg.Ingester.RetainPeriod = cfg.StorageConfig.IndexCacheValidity + 1*time.Minute
}
}
}
54 changes: 54 additions & 0 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,15 @@ chunk_store_config:
t.Run("for the index queries cache config", func(t *testing.T) {
t.Run("no embedded cache enabled by default if Redis is set", func(t *testing.T) {
configFileString := `---
schema_config:
configs:
- from: 2020-10-24
store: boltdb-shipper
object_store: filesystem
schema: v12
index:
prefix: index_
period: 24h
storage_config:
index_queries_cache_config:
redis:
Expand All @@ -886,6 +895,15 @@ storage_config:

t.Run("no embedded cache enabled by default if Memcache is set", func(t *testing.T) {
configFileString := `---
schema_config:
configs:
- from: 2020-10-24
store: boltdb-shipper
object_store: filesystem
schema: v12
index:
prefix: index_
period: 24h
storage_config:
index_queries_cache_config:
memcached_client:
Expand Down Expand Up @@ -1581,6 +1599,15 @@ func Test_applyChunkRetain(t *testing.T) {

t.Run("chunk retain is set to IndexCacheValidity + 1 minute", func(t *testing.T) {
yamlContent := `
schema_config:
configs:
- from: 2020-10-24
store: boltdb-shipper
object_store: filesystem
schema: v12
index:
prefix: index_
period: 24h
storage_config:
index_cache_validity: 10m
index_queries_cache_config:
Expand All @@ -1596,6 +1623,33 @@ storage_config:
assert.NoError(t, err)
assert.Equal(t, 11*time.Minute, config.Ingester.RetainPeriod)
})

t.Run("chunk retain is not changed for tsdb index type", func(t *testing.T) {
yamlContent := `
schema_config:
configs:
- from: 2020-10-24
store: tsdb
object_store: filesystem
schema: v12
index:
prefix: index_
period: 24h
storage_config:
index_cache_validity: 10m
index_queries_cache_config:
memcached:
batch_size: 256
parallelism: 10
memcached_client:
consistent_hash: true
host: memcached-index-queries.loki-bigtable.svc.cluster.local
service: memcached-client
`
config, _, err := configWrapperFromYAML(t, yamlContent, nil)
assert.NoError(t, err)
assert.Equal(t, time.Duration(0), config.Ingester.RetainPeriod)
})
}

func Test_replicationFactor(t *testing.T) {
Expand Down
14 changes: 13 additions & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/grafana/loki/v3/pkg/scheduler"
internalserver "github.com/grafana/loki/v3/pkg/server"
"github.com/grafana/loki/v3/pkg/storage"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
Expand Down Expand Up @@ -276,8 +277,19 @@ func (c *Config) Validate() error {

var errs []error

// Schema version 13 is required to use structured metadata
p := config.ActivePeriodConfig(c.SchemaConfig.Configs)

// If the active index type is not TSDB (which does not use an index cache)
// and the index queries cache is configured
// and the chunk retain period is less than the validity period of the index cache
// throw an error.
if c.SchemaConfig.Configs[p].IndexType != config.TSDBType &&
cache.IsCacheConfigured(c.StorageConfig.IndexQueriesCacheConfig) &&
c.Ingester.RetainPeriod < c.StorageConfig.IndexCacheValidity {
errs = append(errs, fmt.Errorf("CONFIG ERROR: the active index is %s which is configured to use an `index_cache_validty` (TTL) of %s, however the chunk_retain_period is %s which is LESS than the `index_cache_validity`. This can lead to query gaps, please configure the `chunk_retain_period` to be greater than the `index_cache_validity`", c.SchemaConfig.Configs[p].IndexType, c.StorageConfig.IndexCacheValidity, c.Ingester.RetainPeriod))
}

// Schema version 13 is required to use structured metadata
version, err := c.SchemaConfig.Configs[p].VersionAsInt()
if err != nil {
return err
Expand Down
4 changes: 0 additions & 4 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,10 +733,6 @@ func (t *Loki) updateConfigForShipperStore() {
TTL: t.Cfg.StorageConfig.IndexCacheValidity - 1*time.Minute,
},
}
// Force the retain period to be longer than the IndexCacheValidity used in the store, this guarantees we don't
// have query gaps on chunks flushed after an index entry is cached by keeping them retained in the ingester
// and queried as part of live data until the cache TTL expires on the index entry.
t.Cfg.Ingester.RetainPeriod = t.Cfg.StorageConfig.IndexCacheValidity + 1*time.Minute

// We do not want ingester to unnecessarily keep downloading files
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeWriteOnly
Expand Down

0 comments on commit 4117523

Please sign in to comment.