Skip to content

Commit

Permalink
Bloom gateway client: Use binary search to determin membership of fin…
Browse files Browse the repository at this point in the history
…gerprints to server instances (#11336)

Instead of iterating over all fingerprints and determine their respective instance from the ring individually, this new approach takes calculates the token ranges from the instances from the ring and matches and partitions the fingerprints based on that using a binary search to determine membership to a token range.

Signed-off-by: Christian Haudum <[email protected]>
(cherry picked from commit 19b0160)
  • Loading branch information
chaudum authored and sandeepsukhani committed Dec 4, 2023
1 parent 489ac8d commit 4da48bc
Show file tree
Hide file tree
Showing 2 changed files with 506 additions and 137 deletions.
276 changes: 177 additions & 99 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"math"
"math/rand"
"sort"
"sync"

"github.com/go-kit/log"
Expand All @@ -25,15 +26,13 @@ import (
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/queue"
"github.com/grafana/loki/pkg/util"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/util/constants"
)

var (
// groupedChunksRefPool pooling slice of logproto.GroupedChunkRefs [64, 128, 256, ..., 65536]
groupedChunksRefPool = queue.NewSlicePool[*logproto.GroupedChunkRefs](1<<6, 1<<16, 2)
// chunkRefsByAddrsPool pooling slice of chunkRefsByAddrs [64, 128, 256, ..., 65536]
chunkRefsByAddrsPool = queue.NewSlicePool[chunkRefsByAddrs](1<<6, 1<<16, 2)
// ringGetBuffersPool pooling for ringGetBuffers to avoid calling ring.MakeBuffersForGet() for each request
ringGetBuffersPool = sync.Pool{
New: func() interface{} {
Expand Down Expand Up @@ -170,27 +169,28 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t
return groups, nil
}

// Get the addresses of corresponding bloom gateways for each series.
fingerprints, addrs, err := c.serverAddrsForFingerprints(tenant, groups)
subRing := GetShuffleShardingSubring(c.ring, tenant, c.limits)
rs, err := subRing.GetAllHealthy(BlocksRead)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "bloom gateway get healthy instances")
}

// Group chunk refs by addresses of one or more bloom gateways.
// All chunk refs of series that belong to one and the same bloom gateway are set in one batch.
streamsByAddr := c.groupStreamsByAddr(groups, addrs)
streamsByInst, err := c.groupFingerprintsByServer(groups, subRing, rs.Instances)
if err != nil {
return nil, err
}

filteredChunkRefs := groupedChunksRefPool.Get(len(fingerprints))
filteredChunkRefs := groupedChunksRefPool.Get(len(groups))
defer groupedChunksRefPool.Put(filteredChunkRefs)

for _, item := range streamsByAddr {
for _, item := range streamsByInst {
// randomize order of addresses so we don't hotspot the first server in the list
addrs := shuffleAddrs(item.addrs)
addrs := shuffleAddrs(item.instance.addrs)
err := c.doForAddrs(addrs, func(client logproto.BloomGatewayClient) error {
req := &logproto.FilterChunkRefRequest{
From: from,
Through: through,
Refs: item.refs,
Refs: item.fingerprints,
Filters: filters,
}
resp, err := client.FilterChunkRefs(ctx, req)
Expand All @@ -207,55 +207,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t
return filteredChunkRefs, nil
}

// isEqualStringElements checks if two string slices contain the same elements.
// The order of the elements is ignored.
func isEqualStringElements(a, b []string) bool {
if len(a) != len(b) {
return false
}
for _, s := range a {
if !util.StringsContain(b, s) {
return false
}
}
return true
}

// listContainsAddrs checks if a slice of chunkRefAddrs contains an element
// whos field addrs contains the same addresses as the given slice of
// addresses.
// It returns the index of the element, if found, and a boolean whether the
// given list contains the given addrs.
func listContainsAddrs(list []chunkRefsByAddrs, addrs []string) (int, bool) {
for i, r := range list {
if isEqualStringElements(r.addrs, addrs) {
return i, true
}
}
return -1, false
}

type chunkRefsByAddrs struct {
addrs []string
refs []*logproto.GroupedChunkRefs
}

func (c *GatewayClient) groupStreamsByAddr(groups []*logproto.GroupedChunkRefs, addresses [][]string) []chunkRefsByAddrs {
res := chunkRefsByAddrsPool.Get(len(addresses))
defer chunkRefsByAddrsPool.Put(res)

for i := 0; i < len(addresses); i++ {
addrs := addresses[i]
refs := groups[i]
if idx, ok := listContainsAddrs(res, addrs); ok {
res[idx].refs = append(res[idx].refs, refs)
} else {
res = append(res, chunkRefsByAddrs{addrs: addrs, refs: []*logproto.GroupedChunkRefs{refs}})
}
}
return res
}

// doForAddrs sequetially calls the provided callback function fn for each
// address in given slice addrs until the callback function does not return an
// error.
Expand All @@ -279,53 +230,180 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway
return err
}

// serverAddrsForFingerprints returns a slices of server address slices for
// each fingerprint of given fingerprints.
// The indexes of the returned slices correspond to each other.
// Returns an error in case the bloom gateway ring could not get the
// corresponding replica set for a given fingerprint.
// Warning: This function becomes inefficient when the number of fingerprints is very large.
func (c *GatewayClient) serverAddrsForFingerprints(tenantID string, groups []*logproto.GroupedChunkRefs) ([]uint64, [][]string, error) {
subRing := GetShuffleShardingSubring(c.ring, tenantID, c.limits)
func (c *GatewayClient) groupFingerprintsByServer(groups []*logproto.GroupedChunkRefs, subRing ring.ReadRing, instances []ring.InstanceDesc) ([]instanceWithFingerprints, error) {
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()

rs, err := subRing.GetAllHealthy(BlocksRead)
if err != nil {
return nil, nil, errors.Wrap(err, "bloom gateway get healthy instances")
servers := make([]addrsWithTokenRange, 0, len(instances))
prev := -1
it := newInstanceSortMergeIterator(instances)
for it.Next() {
// We can use on of the tokens from the token range
// to obtain all addresses for that token.
rs, err := subRing.Get(it.At().token, BlocksRead, bufDescs, bufHosts, bufZones)
if err != nil {
return nil, errors.Wrap(err, "bloom gateway get ring")
}
servers = append(servers, addrsWithTokenRange{
minToken: uint32(prev + 1),
maxToken: it.At().token,
id: it.At().instance.Id,
addrs: rs.GetAddresses(),
})
prev = int(it.At().token)
}

var numTokens int
for _, instanceDesc := range rs.Instances {
numTokens += len(instanceDesc.Tokens)
if len(servers) > 0 {
// append the instance for the token range between the greates token and MaxUint32
servers = append(servers, addrsWithTokenRange{
minToken: uint32(prev),
maxToken: math.MaxUint32,
addrs: servers[0].addrs,
id: servers[0].id,
})
}

numFingerprints := len(groups)
if numFingerprints > int(float64(numTokens)*math.Log2(float64(numFingerprints))) {
// TODO(chaudum): Implement algorithm in O(n * m * log(k) + n) instead of O(k) by iterating over ring tokens
// and finding corresponding fingerprint ranges using binary search.
// n .. number of instances
// m .. number of tokens per instance
// k .. number of fingerprints
level.Warn(c.logger).Log("msg", "using an inefficient algorithm to determin server addresses for fingerprints", "fingerprints", numFingerprints, "tokens", numTokens)
boundedFingerprints := partitionFingerprintsByAddresses(groups, servers)
return groupByInstance(boundedFingerprints), nil
}

type instanceWithToken struct {
instance ring.InstanceDesc
token uint32
}

type addrsWithTokenRange struct {
id string
addrs []string
minToken, maxToken uint32
}

func (s addrsWithTokenRange) cmp(token uint32) v1.BoundsCheck {
if token < s.minToken {
return v1.Before
} else if token > s.maxToken {
return v1.After
}
return v1.Overlap
}

fingerprints := make([]uint64, numFingerprints)
addresses := make([][]string, numFingerprints)
type instanceWithFingerprints struct {
instance addrsWithTokenRange
fingerprints []*logproto.GroupedChunkRefs
}

buf := ringGetBuffersPool.Get().(*ringGetBuffers)
defer func() {
// Before returning the bufs to the pool, reset them to release them earlier for GC.
buf.Reset()
ringGetBuffersPool.Put(buf)
}()
func partitionFingerprintsByAddresses(fingerprints []*logproto.GroupedChunkRefs, addresses []addrsWithTokenRange) (result []instanceWithFingerprints) {
for _, instance := range addresses {

for idx, key := range groups {
rs, err = subRing.Get(uint32(key.Fingerprint), BlocksRead, buf.Descs, buf.Hosts, buf.Zones)
if err != nil {
return nil, nil, errors.Wrap(err, "bloom gateway get ring")
min := sort.Search(len(fingerprints), func(i int) bool {
return instance.cmp(uint32(fingerprints[i].Fingerprint)) > v1.Before
})

max := sort.Search(len(fingerprints), func(i int) bool {
return instance.cmp(uint32(fingerprints[i].Fingerprint)) == v1.After
})

// fingerprint is out of boundaries
if min == len(fingerprints) || max == 0 {
continue
}
fingerprints[idx] = key.Fingerprint
addresses[idx] = rs.GetAddresses()

result = append(result, instanceWithFingerprints{instance: instance, fingerprints: fingerprints[min:max]})
}

return result
}

// groupByInstance groups fingerprints by server instance
func groupByInstance(boundedFingerprints []instanceWithFingerprints) []instanceWithFingerprints {
if len(boundedFingerprints) == 0 {
return []instanceWithFingerprints{}
}

result := make([]instanceWithFingerprints, 0, len(boundedFingerprints))
pos := make(map[string]int, len(boundedFingerprints))

for _, cur := range boundedFingerprints {
if len(cur.fingerprints) == 0 {
continue
}
// Copy fingerprint slice, otherwise we mutate the original
// TODO(chaudum): Use SlicePool
tmp := make([]*logproto.GroupedChunkRefs, len(cur.fingerprints))
_ = copy(tmp, cur.fingerprints)

idx, ok := pos[cur.instance.id]
if ok {
result[idx].fingerprints = append(result[idx].fingerprints, tmp...)
continue
}

pos[cur.instance.id] = len(result)
result = append(result, instanceWithFingerprints{
instance: addrsWithTokenRange{
id: cur.instance.id,
addrs: cur.instance.addrs,
},
fingerprints: tmp,
})
}

return result
}

// newInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements
// where the token of the elements are sorted in ascending order.
func newInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[instanceWithToken] {
it := &sortMergeIterator[ring.InstanceDesc, uint32, instanceWithToken]{
items: instances,
transform: func(item ring.InstanceDesc, val uint32) instanceWithToken {
return instanceWithToken{instance: item, token: val}
},
}
sequences := make([]v1.PeekingIterator[IndexedValue[uint32]], 0, len(instances))
for i := range instances {
sort.Slice(instances[i].Tokens, func(a, b int) bool {
return instances[i].Tokens[a] < instances[i].Tokens[b]
})
iter := NewIterWithIndex[uint32](v1.NewSliceIter(instances[i].Tokens), i)
sequences = append(sequences, v1.NewPeekingIter[IndexedValue[uint32]](iter))
}
it.heap = v1.NewHeapIterator(
func(i, j IndexedValue[uint32]) bool {
return i.val < j.val
},
sequences...,
)
it.err = nil

return it
}

// sortMergeIterator implements v1.Iterator
type sortMergeIterator[T any, C comparable, R any] struct {
curr R
heap *v1.HeapIterator[IndexedValue[C]]
items []T
transform func(T, C) R
err error
}

func (it *sortMergeIterator[T, C, R]) Next() bool {
ok := it.heap.Next()
if !ok {
it.err = io.EOF
return false
}

group := it.heap.At()
it.curr = it.transform(it.items[group.idx], group.val)

return true
}

func (it *sortMergeIterator[T, C, R]) At() R {
return it.curr
}

return fingerprints, addresses, nil
func (it *sortMergeIterator[T, C, R]) Err() error {
return it.err
}
Loading

0 comments on commit 4da48bc

Please sign in to comment.