diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index ba9e5c723515c..d3265e86445fe 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -4727,15 +4727,16 @@ The cache block configures the cache backend. The supported CLI flags `` background: # At what concurrency to write back to cache. # CLI flag: -.background.write-back-concurrency - [writeback_goroutines: | default = 10] + [writeback_goroutines: | default = 1] - # How many key batches to buffer for background write-back. + # How many key batches to buffer for background write-back. Default is large + # to prefer size based limiting. # CLI flag: -.background.write-back-buffer - [writeback_buffer: | default = 10000] + [writeback_buffer: | default = 500000] # Size limit in bytes for background write-back. # CLI flag: -.background.write-back-size-limit - [writeback_size_limit: | default = 1GB] + [writeback_size_limit: | default = 500MB] memcached: # How long keys stay in the memcache. @@ -4744,11 +4745,11 @@ memcached: # How many keys to fetch in each batch. # CLI flag: -.memcached.batchsize - [batch_size: | default = 256] + [batch_size: | default = 4] # Maximum active requests to memcache. # CLI flag: -.memcached.parallelism - [parallelism: | default = 10] + [parallelism: | default = 5] memcached_client: # Hostname for memcached service to use. If empty and if addresses is unset, @@ -4942,14 +4943,6 @@ embedded_cache: # The time to live for items in the cache before they get purged. # CLI flag: -.embedded-cache.ttl [ttl: | default = 1h] - -# The maximum number of concurrent asynchronous writeback cache can occur. -# CLI flag: -.max-async-cache-write-back-concurrency -[async_cache_write_back_concurrency: | default = 16] - -# The maximum number of enqueued asynchronous writeback cache allowed. -# CLI flag: -.max-async-cache-write-back-buffer-size -[async_cache_write_back_buffer_size: | default = 500] ``` ### period_config diff --git a/docs/sources/setup/upgrade/_index.md b/docs/sources/setup/upgrade/_index.md index 2cec750395521..4badb1a4c52a8 100644 --- a/docs/sources/setup/upgrade/_index.md +++ b/docs/sources/setup/upgrade/_index.md @@ -107,6 +107,15 @@ Going forward compactor will run compaction and retention on all the object stor `-compactor.delete-request-store` or its YAML setting should be explicitly configured when retention is enabled, this is required for storing delete requests. The path prefix under which the delete requests are stored is decided by `-compactor.delete-request-store.key-prefix`, it defaults to `index/`. +#### Configuration `async_cache_write_back_concurrency` and `async_cache_write_back_buffer_size` have been removed + +These configurations were redundant with the `Background` configuration in the [cache-config]({{< relref "../../configure#cache_config" >}}). + +`async_cache_write_back_concurrency` can be set with `writeback_goroutines` +`async_cache_write_back_buffer_size` can be set with `writeback_buffer` + +additionally the `Background` configuration also lest you set `writeback_size_limit` which can be used to set a maximum amount of memory to use for writeback objects vs a count of objects. + #### Configuration `use_boltdb_shipper_as_backup` is removed The setting `use_boltdb_shipper_as_backup` (`-tsdb.shipper.use-boltdb-shipper-as-backup`) was a remnant from the development of the TSDB storage. diff --git a/pkg/storage/chunk/cache/background.go b/pkg/storage/chunk/cache/background.go index 859bdf96f9160..c5899e3c8bbb3 100644 --- a/pkg/storage/chunk/cache/background.go +++ b/pkg/storage/chunk/cache/background.go @@ -26,9 +26,9 @@ type BackgroundConfig struct { // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *BackgroundConfig) RegisterFlagsWithPrefix(prefix string, description string, f *flag.FlagSet) { - f.IntVar(&cfg.WriteBackGoroutines, prefix+"background.write-back-concurrency", 10, description+"At what concurrency to write back to cache.") - f.IntVar(&cfg.WriteBackBuffer, prefix+"background.write-back-buffer", 10000, description+"How many key batches to buffer for background write-back.") - _ = cfg.WriteBackSizeLimit.Set("1GB") + f.IntVar(&cfg.WriteBackGoroutines, prefix+"background.write-back-concurrency", 1, description+"At what concurrency to write back to cache.") + f.IntVar(&cfg.WriteBackBuffer, prefix+"background.write-back-buffer", 500000, description+"How many key batches to buffer for background write-back. Default is large to prefer size based limiting.") + _ = cfg.WriteBackSizeLimit.Set("500MB") f.Var(&cfg.WriteBackSizeLimit, prefix+"background.write-back-size-limit", description+"Size limit in bytes for background write-back.") } diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 6e1565fcaa3e8..9239fe88d7f05 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -38,11 +38,6 @@ type Config struct { // For tests to inject specific implementations. Cache Cache `yaml:"-"` - - // AsyncCacheWriteBackConcurrency specifies the number of goroutines to use when asynchronously writing chunks fetched from the store to the chunk cache. - AsyncCacheWriteBackConcurrency int `yaml:"async_cache_write_back_concurrency"` - // AsyncCacheWriteBackBufferSize specifies the maximum number of fetched chunks to buffer for writing back to the chunk cache. - AsyncCacheWriteBackBufferSize int `yaml:"async_cache_write_back_buffer_size"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet @@ -52,8 +47,6 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f) cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f) cfg.EmbeddedCache.RegisterFlagsWithPrefix(prefix+"embedded-cache.", description, f) - f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-cache-write-back-concurrency", 16, "The maximum number of concurrent asynchronous writeback cache can occur.") - f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-cache-write-back-buffer-size", 500, "The maximum number of enqueued asynchronous writeback cache allowed.") f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.") cfg.Prefix = prefix diff --git a/pkg/storage/chunk/cache/cache_test.go b/pkg/storage/chunk/cache/cache_test.go index 5595b2df0a6cf..23550dd34965e 100644 --- a/pkg/storage/chunk/cache/cache_test.go +++ b/pkg/storage/chunk/cache/cache_test.go @@ -131,7 +131,7 @@ func testChunkFetcher(t *testing.T, c cache.Cache, chunks []chunk.Chunk) { }, } - fetcher, err := fetcher.New(c, nil, false, s, nil, 10, 100, 0) + fetcher, err := fetcher.New(c, nil, false, s, nil, 0) require.NoError(t, err) defer fetcher.Stop() diff --git a/pkg/storage/chunk/cache/memcached.go b/pkg/storage/chunk/cache/memcached.go index ca8e2e2f92da2..8e47168afc2ad 100644 --- a/pkg/storage/chunk/cache/memcached.go +++ b/pkg/storage/chunk/cache/memcached.go @@ -30,8 +30,8 @@ type MemcachedConfig struct { // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *MemcachedConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { f.DurationVar(&cfg.Expiration, prefix+"memcached.expiration", 0, description+"How long keys stay in the memcache.") - f.IntVar(&cfg.BatchSize, prefix+"memcached.batchsize", 256, description+"How many keys to fetch in each batch.") - f.IntVar(&cfg.Parallelism, prefix+"memcached.parallelism", 10, description+"Maximum active requests to memcache.") + f.IntVar(&cfg.BatchSize, prefix+"memcached.batchsize", 4, description+"How many keys to fetch in each batch.") + f.IntVar(&cfg.Parallelism, prefix+"memcached.parallelism", 5, description+"Maximum active requests to memcache.") } // Memcached type caches chunks in memcached diff --git a/pkg/storage/chunk/fetcher/fetcher.go b/pkg/storage/chunk/fetcher/fetcher.go index 7801143932842..cf763b9cbedc9 100644 --- a/pkg/storage/chunk/fetcher/fetcher.go +++ b/pkg/storage/chunk/fetcher/fetcher.go @@ -2,7 +2,6 @@ package fetcher import ( "context" - "errors" "sync" "time" @@ -23,19 +22,6 @@ import ( ) var ( - errAsyncBufferFull = errors.New("the async buffer is full") - skipped = promauto.NewCounter(prometheus.CounterOpts{ - Name: "loki_chunk_fetcher_cache_skipped_buffer_full_total", - Help: "Total number of operations against cache that have been skipped.", - }) - chunkFetcherCacheQueueEnqueue = promauto.NewCounter(prometheus.CounterOpts{ - Name: "loki_chunk_fetcher_cache_enqueued_total", - Help: "Total number of chunks enqueued to a buffer to be asynchronously written back to the chunk cache.", - }) - chunkFetcherCacheQueueDequeue = promauto.NewCounter(prometheus.CounterOpts{ - Name: "loki_chunk_fetcher_cache_dequeued_total", - Help: "Total number of chunks asynchronously dequeued from a buffer and written back to the chunk cache.", - }) cacheCorrupt = promauto.NewCounter(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "cache_corrupt_chunks_total", @@ -69,12 +55,7 @@ type Fetcher struct { wait sync.WaitGroup decodeRequests chan decodeRequest - maxAsyncConcurrency int - maxAsyncBufferSize int - - asyncQueue chan []chunk.Chunk - stopOnce sync.Once - stop chan struct{} + stopOnce sync.Once } type decodeRequest struct { @@ -89,18 +70,15 @@ type decodeResponse struct { } // New makes a new ChunkFetcher. -func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, maxAsyncConcurrency int, maxAsyncBufferSize int, l2CacheHandoff time.Duration) (*Fetcher, error) { +func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, l2CacheHandoff time.Duration) (*Fetcher, error) { c := &Fetcher{ - schema: schema, - storage: storage, - cache: cache, - cachel2: cachel2, - l2CacheHandoff: l2CacheHandoff, - cacheStubs: cacheStubs, - decodeRequests: make(chan decodeRequest), - maxAsyncConcurrency: maxAsyncConcurrency, - maxAsyncBufferSize: maxAsyncBufferSize, - stop: make(chan struct{}), + schema: schema, + storage: storage, + cache: cache, + cachel2: cachel2, + l2CacheHandoff: l2CacheHandoff, + cacheStubs: cacheStubs, + decodeRequests: make(chan decodeRequest), } c.wait.Add(chunkDecodeParallelism) @@ -108,48 +86,15 @@ func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config. go c.worker() } - // Start a number of goroutines - processing async operations - equal - // to the max concurrency we have. - c.asyncQueue = make(chan []chunk.Chunk, c.maxAsyncBufferSize) - for i := 0; i < c.maxAsyncConcurrency; i++ { - go c.asyncWriteBackCacheQueueProcessLoop() - } - return c, nil } -func (c *Fetcher) writeBackCacheAsync(fromStorage []chunk.Chunk) error { - select { - case c.asyncQueue <- fromStorage: - chunkFetcherCacheQueueEnqueue.Add(float64(len(fromStorage))) - return nil - default: - return errAsyncBufferFull - } -} - -func (c *Fetcher) asyncWriteBackCacheQueueProcessLoop() { - for { - select { - case fromStorage := <-c.asyncQueue: - chunkFetcherCacheQueueDequeue.Add(float64(len(fromStorage))) - cacheErr := c.WriteBackCache(context.Background(), fromStorage) - if cacheErr != nil { - level.Warn(util_log.Logger).Log("msg", "could not write fetched chunks from storage into chunk cache", "err", cacheErr) - } - case <-c.stop: - return - } - } -} - // Stop the ChunkFetcher. func (c *Fetcher) Stop() { c.stopOnce.Do(func() { close(c.decodeRequests) c.wait.Wait() c.cache.Stop() - close(c.stop) }) } @@ -267,10 +212,8 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []chunk.Chunk) ([]chun st.AddCacheBytesSent(stats.ChunkCache, bytes) // Always cache any chunks we did get - if cacheErr := c.writeBackCacheAsync(fromStorage); cacheErr != nil { - if cacheErr == errAsyncBufferFull { - skipped.Inc() - } + + if cacheErr := c.WriteBackCache(ctx, fromStorage); cacheErr != nil { level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) } diff --git a/pkg/storage/chunk/fetcher/fetcher_test.go b/pkg/storage/chunk/fetcher/fetcher_test.go index c6215bde5b980..902b0dae1d743 100644 --- a/pkg/storage/chunk/fetcher/fetcher_test.go +++ b/pkg/storage/chunk/fetcher/fetcher_test.go @@ -193,7 +193,7 @@ func Test(t *testing.T) { assert.NoError(t, chunkClient.PutChunks(context.Background(), test.storeStart)) // Build fetcher - f, err := New(c1, c2, false, sc, chunkClient, 1, 1, test.handoff) + f, err := New(c1, c2, false, sc, chunkClient, test.handoff) assert.NoError(t, err) // Run the test @@ -290,7 +290,7 @@ func BenchmarkFetch(b *testing.B) { _ = chunkClient.PutChunks(context.Background(), test.storeStart) // Build fetcher - f, _ := New(c1, c2, false, sc, chunkClient, 1, 1, test.handoff) + f, _ := New(c1, c2, false, sc, chunkClient, test.handoff) for i := 0; i < b.N; i++ { _, err := f.FetchChunks(context.Background(), test.fetch) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 1a4fa386062f7..9e50d1531d587 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -199,7 +199,7 @@ func (s *LokiStore) init() error { if err != nil { return err } - f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackConcurrency, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackBufferSize, s.storeCfg.L2ChunkCacheHandoff) + f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.L2ChunkCacheHandoff) if err != nil { return err } diff --git a/pkg/storage/stores/series_store_write_test.go b/pkg/storage/stores/series_store_write_test.go index 823f5bf11f0a1..cac84a17ebfbf 100644 --- a/pkg/storage/stores/series_store_write_test.go +++ b/pkg/storage/stores/series_store_write_test.go @@ -155,7 +155,7 @@ func TestChunkWriter_PutOne(t *testing.T) { idx := &mockIndexWriter{} client := &mockChunksClient{} - f, err := fetcher.New(cache, nil, false, schemaConfig, client, 1, 1, 0) + f, err := fetcher.New(cache, nil, false, schemaConfig, client, 0) require.NoError(t, err) cw := NewChunkWriter(f, schemaConfig, idx, true) diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 7743bce2fb0f9..db71f70af5a85 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -260,7 +260,7 @@ func (m *mockChunkStore) GetChunks(_ context.Context, _ string, _, _ model.Time, panic(err) } - f, err := fetcher.New(cache, nil, false, m.schemas, m.client, 10, 100, 0) + f, err := fetcher.New(cache, nil, false, m.schemas, m.client, 0) if err != nil { panic(err) }