Skip to content

Commit

Permalink
fix(blooms): Reduce jumps between buckets in blocks pool
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Mar 26, 2024
1 parent 9af191f commit df48414
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 6 deletions.
4 changes: 2 additions & 2 deletions pkg/storage/bloom/v1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHead
return nil, errors.Wrap(err, "seeking to series page")
}

data := BlockPool.Get(header.Len)[:header.Len]
defer BlockPool.Put(data)
data := seriesPagePool.Get(header.Len)[:header.Len]
defer seriesPagePool.Put(data)
_, err = io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading series page")
Expand Down
28 changes: 24 additions & 4 deletions pkg/storage/bloom/v1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"io"
"sync"

"github.com/prometheus/prometheus/util/pool"
"github.com/grafana/loki/pkg/util/pool"
)

const (
Expand All @@ -32,10 +32,30 @@ var (
},
}

// 4KB -> 128MB
BlockPool = BytePool{
// 1KB -> 1MB, 2x growth
seriesPagePool = BytePool{
pool: pool.New(
4<<10, 128<<20, 4,
1<<10, 1<<20, 2,
func(size int) interface{} {
return make([]byte, size)
}),
}

// 1MB -> 128MB
BlockPool = BytePool{
pool: pool.NewWithSizes(
[]int{
1 << 20,
2 << 20,
4 << 20,
8 << 20,
16 << 20,
32 << 20,
48 << 20,
64 << 20,
96 << 20,
128 << 20,
},
func(size int) interface{} {
return make([]byte, size)
}),
Expand Down
90 changes: 90 additions & 0 deletions pkg/util/pool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package pool

import (
"fmt"
"reflect"
"sync"
)

// NOTE this is pretty much a copy of prometheus pool

// Pool is a bucketed pool for variably sized byte slices.
type Pool struct {
buckets []sync.Pool
sizes []int
// make is the function used to create an empty slice when none exist yet.
make func(int) interface{}
}

// New returns a new Pool with size buckets for minSize to maxSize
// increasing by the given factor.
func New(minSize, maxSize int, factor float64, makeFunc func(int) interface{}) *Pool {
if minSize < 1 {
panic("invalid minimum pool size")
}
if maxSize < 1 {
panic("invalid maximum pool size")
}
if factor < 2 {
panic("invalid factor")
}

var sizes []int

for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
sizes = append(sizes, s)
}

p := &Pool{
buckets: make([]sync.Pool, len(sizes)),
sizes: sizes,
make: makeFunc,
}

return p
}

func NewWithSizes(sizes []int, makeFunc func(int) interface{}) *Pool {
if len(sizes) < 1 {
panic("invalid pool sizes")
}

p := &Pool{
buckets: make([]sync.Pool, len(sizes)),
sizes: sizes,
make: makeFunc,
}

return p
}

// Get returns a new byte slices that fits the given size.
func (p *Pool) Get(sz int) interface{} {
for i, bktSize := range p.sizes {
if sz > bktSize {
continue
}
b := p.buckets[i].Get()
if b == nil {
b = p.make(bktSize)
}
return b
}
return p.make(sz)
}

// Put adds a slice to the right bucket in the pool.
func (p *Pool) Put(s interface{}) {
slice := reflect.ValueOf(s)

if slice.Kind() != reflect.Slice {
panic(fmt.Sprintf("%+v is not a slice", slice))
}
for i, size := range p.sizes {
if slice.Cap() > size {
continue
}
p.buckets[i].Put(slice.Slice(0, 0).Interface())
return
}
}
62 changes: 62 additions & 0 deletions pkg/util/pool/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package pool

import (
"github.com/stretchr/testify/require"
"testing"
)

func TestPoolNew(t *testing.T) {
testPool := New(1, 8, 2, func(size int) interface{} {
return make([]int, size)
})
cases := []struct {
size int
expectedCap int
}{
{
size: -1,
expectedCap: 1,
},
{
size: 3,
expectedCap: 4,
},
{
size: 10,
expectedCap: 10,
},
}
for _, c := range cases {
ret := testPool.Get(c.size)
require.Equal(t, c.expectedCap, cap(ret.([]int)))
testPool.Put(ret)
}
}

func TestPoolNewWithSizes(t *testing.T) {
testPool := NewWithSizes([]int{1, 2, 4, 8}, func(size int) interface{} {
return make([]int, size)
})
cases := []struct {
size int
expectedCap int
}{
{
size: -1,
expectedCap: 1,
},
{
size: 3,
expectedCap: 4,
},
{
size: 10,
expectedCap: 10,
},
}
for _, c := range cases {
ret := testPool.Get(c.size)
require.Equal(t, c.expectedCap, cap(ret.([]int)))
testPool.Put(ret)
}
}

0 comments on commit df48414

Please sign in to comment.