Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(bloomstore): Cache metas LIST operation #12414

Merged
merged 4 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"hash"
"io"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/concurrency"
Expand Down Expand Up @@ -417,3 +419,88 @@ func findPeriod(configs []config.PeriodConfig, ts model.Time) (config.DayTime, e
}
return config.DayTime{}, fmt.Errorf("can not find period for timestamp %d", ts)
}

type listOpResult struct {
ts time.Time
objects []client.StorageObject
prefixes []client.StorageCommonPrefix
}

type listOpCache map[string]listOpResult

type cachedListOpObjectClient struct {
client.ObjectClient
cache listOpCache
mtx sync.RWMutex
ttl, interval time.Duration
done chan struct{}
}

func newCachedListOpObjectClient(oc client.ObjectClient, ttl, interval time.Duration) *cachedListOpObjectClient {
client := &cachedListOpObjectClient{
ObjectClient: oc,
cache: make(listOpCache),
done: make(chan struct{}),
ttl: ttl,
interval: interval,
}

go func(c *cachedListOpObjectClient) {
ticker := time.NewTicker(c.interval)
defer ticker.Stop()

for {
select {
case <-c.done:
return
case <-ticker.C:
c.mtx.Lock()
for k := range c.cache {
if time.Since(c.cache[k].ts) > c.ttl {
delete(c.cache, k)
}
}
c.mtx.Unlock()
}
}
}(client)

return client
}

func (c *cachedListOpObjectClient) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
if delimiter != "" {
return nil, nil, fmt.Errorf("does not support LIST calls with delimiter: %s", delimiter)
}
c.mtx.RLock()
cached, found := c.cache[prefix]
c.mtx.RUnlock()
if found {
return cached.objects, cached.prefixes, nil
}

c.mtx.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be moved after the c.ObjectClient.List? Right now it's blocking any List of till the inner client finished.
IIUC we might want to do this so two lists doesn't happen at the same time but I think that's not ideal. Would make sense to block prevent two simultaneus calls for the same prefix tho.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I would like to avoid multiple concurrent LIST calls. You still need to sync the map access thereafter. So I think it makes sense to lock the whole operation.

Having separate locks for different prefixes makes eviction more complex. Not sure if that's worth it then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. We can re-evaluate if we see lock contention.

defer c.mtx.Unlock()

objects, prefixes, err := c.ObjectClient.List(ctx, prefix, delimiter)
if err != nil {
return nil, nil, err
}

c.cache[prefix] = listOpResult{
ts: time.Now(),
objects: objects,
prefixes: prefixes,
}

return objects, prefixes, err
}

func (c *cachedListOpObjectClient) Stop() {
c.mtx.Lock()
defer c.mtx.Unlock()

close(c.done)
c.cache = nil
c.ObjectClient.Stop()
}
67 changes: 67 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"os"
"path"
"testing"
"time"

Expand All @@ -14,6 +15,7 @@ import (
"github.com/stretchr/testify/require"

v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/testutils"
"github.com/grafana/loki/v3/pkg/storage/config"
)
Expand Down Expand Up @@ -341,3 +343,68 @@ func TestBloomClient_DeleteBlocks(t *testing.T) {
require.False(t, found)
})
}

type mockListClient struct {
client.ObjectClient
counter int
}

func (c *mockListClient) List(_ context.Context, prefix string, _ string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
c.counter++
objects := []client.StorageObject{
{Key: path.Join(path.Base(prefix), "object")},
}
prefixes := []client.StorageCommonPrefix{
client.StorageCommonPrefix(prefix),
}
return objects, prefixes, nil
}

func (c *mockListClient) Stop() {
}

func TestBloomClient_CachedListOpObjectClient(t *testing.T) {

t.Run("list call with delimiter returns error", func(t *testing.T) {
downstreamClient := &mockListClient{}
c := newCachedListOpObjectClient(downstreamClient, 100*time.Millisecond, 10*time.Millisecond)
t.Cleanup(c.Stop)

_, _, err := c.List(context.Background(), "prefix/", "/")
require.Error(t, err)
})

t.Run("list calls are cached by prefix", func(t *testing.T) {
downstreamClient := &mockListClient{}
c := newCachedListOpObjectClient(downstreamClient, 100*time.Millisecond, 10*time.Millisecond)
t.Cleanup(c.Stop)

// cache miss
res, _, err := c.List(context.Background(), "a/", "")
require.NoError(t, err)
require.Equal(t, 1, downstreamClient.counter)
require.Equal(t, []client.StorageObject{{Key: "a/object"}}, res)

// cache miss
res, _, err = c.List(context.Background(), "b/", "")
require.NoError(t, err)
require.Equal(t, 2, downstreamClient.counter)
require.Equal(t, []client.StorageObject{{Key: "b/object"}}, res)

// cache hit
res, _, err = c.List(context.Background(), "a/", "")
require.NoError(t, err)
require.Equal(t, 2, downstreamClient.counter)
require.Equal(t, []client.StorageObject{{Key: "a/object"}}, res)

// wait for >=ttl so items are expired
time.Sleep(150 * time.Millisecond)

// cache miss
res, _, err = c.List(context.Background(), "a/", "")
require.NoError(t, err)
require.Equal(t, 3, downstreamClient.counter)
require.Equal(t, []client.StorageObject{{Key: "a/object"}}, res)
})

}
7 changes: 7 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type Config struct {
DownloadParallelism int `yaml:"download_parallelism"`
BlocksCache BlocksCacheConfig `yaml:"blocks_cache"`
MetasCache cache.Config `yaml:"metas_cache"`

// This will always be set to true when flags are registered.
// In tests, where config is created as literal, it can be set manually.
CacheListOps bool `yaml:"-"`
}

func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
Expand All @@ -27,6 +31,9 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&c.DownloadParallelism, prefix+"download-parallelism", 16, "The amount of maximum concurrent bloom blocks downloads.")
c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour)
c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f)

// always cache LIST operations
c.CacheListOps = true
}

func (c *Config) Validate() error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"path"
"sort"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -294,6 +295,9 @@ func NewBloomStore(
return nil, errors.Wrapf(err, "creating object client for period %s", periodicConfig.From)
}

if storageConfig.BloomShipperConfig.CacheListOps {
objectClient = newCachedListOpObjectClient(objectClient, 5*time.Minute, 10*time.Second)
}
bloomClient, err := NewBloomClient(cfg, objectClient, logger)
if err != nil {
return nil, errors.Wrapf(err, "creating bloom client for period %s", periodicConfig.From)
Expand Down
Loading