Skip to content

Commit

Permalink
Move common code from bloomgateway into shared packages
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Dec 5, 2023
1 parent efe32bb commit 75e166f
Show file tree
Hide file tree
Showing 10 changed files with 288 additions and 201 deletions.
77 changes: 9 additions & 68 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/bloomutils"
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/queue"
Expand Down Expand Up @@ -234,31 +235,29 @@ func (c *GatewayClient) groupFingerprintsByServer(groups []*logproto.GroupedChun
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()

servers := make([]addrsWithTokenRange, 0, len(instances))
prev := -1
it := newInstanceSortMergeIterator(instances)
it := bloomutils.NewInstanceSortMergeIterator(instances)
for it.Next() {
// We can use on of the tokens from the token range
// to obtain all addresses for that token.
rs, err := subRing.Get(it.At().token, BlocksRead, bufDescs, bufHosts, bufZones)
rs, err := subRing.Get(it.At().MaxToken, BlocksRead, bufDescs, bufHosts, bufZones)
if err != nil {
return nil, errors.Wrap(err, "bloom gateway get ring")
}
servers = append(servers, addrsWithTokenRange{
minToken: uint32(prev + 1),
maxToken: it.At().token,
id: it.At().instance.Id,
id: it.At().Instance.Id,
addrs: rs.GetAddresses(),
minToken: it.At().MinToken,
maxToken: it.At().MaxToken,
})
prev = int(it.At().token)
}

if len(servers) > 0 {
// append the instance for the token range between the greates token and MaxUint32
servers = append(servers, addrsWithTokenRange{
minToken: uint32(prev),
maxToken: math.MaxUint32,
addrs: servers[0].addrs,
id: servers[0].id,
addrs: servers[0].addrs,
minToken: servers[len(servers)-1].maxToken + 1,
maxToken: math.MaxUint32,
})
}

Expand Down Expand Up @@ -349,61 +348,3 @@ func groupByInstance(boundedFingerprints []instanceWithFingerprints) []instanceW

return result
}

// newInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements
// where the token of the elements are sorted in ascending order.
func newInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[instanceWithToken] {
it := &sortMergeIterator[ring.InstanceDesc, uint32, instanceWithToken]{
items: instances,
transform: func(item ring.InstanceDesc, val uint32) instanceWithToken {
return instanceWithToken{instance: item, token: val}
},
}
sequences := make([]v1.PeekingIterator[IndexedValue[uint32]], 0, len(instances))
for i := range instances {
sort.Slice(instances[i].Tokens, func(a, b int) bool {
return instances[i].Tokens[a] < instances[i].Tokens[b]
})
iter := NewIterWithIndex[uint32](v1.NewSliceIter(instances[i].Tokens), i)
sequences = append(sequences, v1.NewPeekingIter[IndexedValue[uint32]](iter))
}
it.heap = v1.NewHeapIterator(
func(i, j IndexedValue[uint32]) bool {
return i.val < j.val
},
sequences...,
)
it.err = nil

return it
}

// sortMergeIterator implements v1.Iterator
type sortMergeIterator[T any, C comparable, R any] struct {
curr R
heap *v1.HeapIterator[IndexedValue[C]]
items []T
transform func(T, C) R
err error
}

func (it *sortMergeIterator[T, C, R]) Next() bool {
ok := it.heap.Next()
if !ok {
it.err = io.EOF
return false
}

group := it.heap.At()
it.curr = it.transform(it.items[group.idx], group.val)

return true
}

func (it *sortMergeIterator[T, C, R]) At() R {
return it.curr
}

func (it *sortMergeIterator[T, C, R]) Err() error {
return it.err
}
45 changes: 9 additions & 36 deletions pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/bloomutils"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/validation"
)

func TestBloomGatewayClient(t *testing.T) {

logger := log.NewNopLogger()
reg := prometheus.NewRegistry()

Expand All @@ -32,33 +32,6 @@ func TestBloomGatewayClient(t *testing.T) {
})
}

func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) {
input := []ring.InstanceDesc{
{Id: "1", Tokens: []uint32{6, 5, 2, 9}},
{Id: "2", Tokens: []uint32{3, 4, 7}},
{Id: "3", Tokens: []uint32{1, 8, 0}},
}
expected := []instanceWithToken{
{instance: input[2], token: 0},
{instance: input[2], token: 1},
{instance: input[0], token: 2},
{instance: input[1], token: 3},
{instance: input[1], token: 4},
{instance: input[0], token: 5},
{instance: input[0], token: 6},
{instance: input[1], token: 7},
{instance: input[2], token: 8},
{instance: input[0], token: 9},
}

var i int
it := newInstanceSortMergeIterator(input)
for it.Next() {
require.Equal(t, expected[i], it.At())
i++
}
}

func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) {
// instance token ranges do not overlap
t.Run("non-overlapping", func(t *testing.T) {
Expand Down Expand Up @@ -203,9 +176,9 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
{Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{2014002871, 315617625, 1036168527}},
}

it := newInstanceSortMergeIterator(instances)
it := bloomutils.NewInstanceSortMergeIterator(instances)
for it.Next() {
t.Log(it.At().token, it.At().instance.Addr)
t.Log(it.At().MaxToken, it.At().Instance.Addr)
}

testCases := []struct {
Expand Down Expand Up @@ -327,8 +300,8 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
var _ ring.ReadRing = &mockRing{}

func newMockRing(instances []ring.InstanceDesc) *mockRing {
it := newInstanceSortMergeIterator(instances)
ranges := make([]instanceWithToken, 0)
it := bloomutils.NewInstanceSortMergeIterator(instances)
ranges := make([]bloomutils.InstanceWithTokenRange, 0)
for it.Next() {
ranges = append(ranges, it.At())
}
Expand All @@ -340,21 +313,21 @@ func newMockRing(instances []ring.InstanceDesc) *mockRing {

type mockRing struct {
instances []ring.InstanceDesc
ranges []instanceWithToken
ranges []bloomutils.InstanceWithTokenRange
}

// Get implements ring.ReadRing.
func (r *mockRing) Get(key uint32, _ ring.Operation, _ []ring.InstanceDesc, _ []string, _ []string) (ring.ReplicationSet, error) {
idx, _ := sort.Find(len(r.ranges), func(i int) int {
if r.ranges[i].token < key {
if r.ranges[i].MaxToken < key {
return 1
}
if r.ranges[i].token > key {
if r.ranges[i].MaxToken > key {
return -1
}
return 0
})
return ring.ReplicationSet{Instances: []ring.InstanceDesc{r.ranges[idx].instance}}, nil
return ring.ReplicationSet{Instances: []ring.InstanceDesc{r.ranges[idx].Instance}}, nil
}

// GetAllHealthy implements ring.ReadRing.
Expand Down
16 changes: 8 additions & 8 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ type FilterRequest struct {
// taskMergeIterator implements v1.Iterator
type taskMergeIterator struct {
curr FilterRequest
heap *v1.HeapIterator[IndexedValue[*logproto.GroupedChunkRefs]]
heap *v1.HeapIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]]
tasks []Task
day time.Time
err error
Expand All @@ -181,14 +181,14 @@ func newTaskMergeIterator(day time.Time, tasks ...Task) v1.PeekingIterator[v1.Re
}

func (it *taskMergeIterator) init() {
sequences := make([]v1.PeekingIterator[IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks))
sequences := make([]v1.PeekingIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks))
for i := range it.tasks {
iter := NewIterWithIndex(it.tasks[i].ChunkIterForDay(it.day), i)
iter := v1.NewIterWithIndex(it.tasks[i].ChunkIterForDay(it.day), i)
sequences = append(sequences, v1.NewPeekingIter(iter))
}
it.heap = v1.NewHeapIterator(
func(i, j IndexedValue[*logproto.GroupedChunkRefs]) bool {
return i.val.Fingerprint < j.val.Fingerprint
func(i, j v1.IndexedValue[*logproto.GroupedChunkRefs]) bool {
return i.Value().Fingerprint < j.Value().Fingerprint
},
sequences...,
)
Expand All @@ -202,10 +202,10 @@ func (it *taskMergeIterator) Next() bool {
}

group := it.heap.At()
task := it.tasks[group.idx]
task := it.tasks[group.Index()]

it.curr.Fp = model.Fingerprint(group.val.Fingerprint)
it.curr.Chks = convertToChunkRefs(group.val.Refs)
it.curr.Fp = model.Fingerprint(group.Value().Fingerprint)
it.curr.Chks = convertToChunkRefs(group.Value().Refs)
it.curr.Searches = convertToSearches(task.Request.Filters)
it.curr.Response = task.ResCh
it.curr.Error = task.ErrCh
Expand Down
61 changes: 0 additions & 61 deletions pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,67 +12,6 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

type IndexedValue[T any] struct {
idx int
val T
}

type IterWithIndex[T any] struct {
v1.Iterator[T]
zero T // zero value of T
cache IndexedValue[T]
}

func (it *IterWithIndex[T]) At() IndexedValue[T] {
it.cache.val = it.Iterator.At()
return it.cache
}

func NewIterWithIndex[T any](iter v1.Iterator[T], idx int) v1.Iterator[IndexedValue[T]] {
return &IterWithIndex[T]{
Iterator: iter,
cache: IndexedValue[T]{idx: idx},
}
}

type SliceIterWithIndex[T any] struct {
xs []T // source slice
pos int // position within the slice
zero T // zero value of T
cache IndexedValue[T]
}

func (it *SliceIterWithIndex[T]) Next() bool {
it.pos++
return it.pos < len(it.xs)
}

func (it *SliceIterWithIndex[T]) Err() error {
return nil
}

func (it *SliceIterWithIndex[T]) At() IndexedValue[T] {
it.cache.val = it.xs[it.pos]
return it.cache
}

func (it *SliceIterWithIndex[T]) Peek() (IndexedValue[T], bool) {
if it.pos+1 >= len(it.xs) {
it.cache.val = it.zero
return it.cache, false
}
it.cache.val = it.xs[it.pos+1]
return it.cache, true
}

func NewSliceIterWithIndex[T any](xs []T, idx int) v1.PeekingIterator[IndexedValue[T]] {
return &SliceIterWithIndex[T]{
xs: xs,
pos: -1,
cache: IndexedValue[T]{idx: idx},
}
}

func getDayTime(ts model.Time) time.Time {
return time.Date(ts.Time().Year(), ts.Time().Month(), ts.Time().Day(), 0, 0, 0, 0, time.UTC)
}
Expand Down
28 changes: 0 additions & 28 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,6 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

func TestSliceIterWithIndex(t *testing.T) {
t.Run("SliceIterWithIndex implements v1.PeekingIterator interface", func(t *testing.T) {
xs := []string{"a", "b", "c"}
it := NewSliceIterWithIndex(xs, 123)

// peek at first item
p, ok := it.Peek()
require.True(t, ok)
require.Equal(t, "a", p.val)
require.Equal(t, 123, p.idx)

// proceed to first item
require.True(t, it.Next())
require.Equal(t, "a", it.At().val)
require.Equal(t, 123, it.At().idx)

// proceed to second and third item
require.True(t, it.Next())
require.True(t, it.Next())

// peek at non-existing fourth item
p, ok = it.Peek()
require.False(t, ok)
require.Equal(t, "", p.val) // "" is zero value for type string
require.Equal(t, 123, p.idx)
})
}

func TestGetFromThrough(t *testing.T) {
chunks := []*logproto.ShortRef{
{From: 0, Through: 6},
Expand Down
37 changes: 37 additions & 0 deletions pkg/bloomutils/iter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package bloomutils

import (
"io"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

// sortMergeIterator implements v1.Iterator
type sortMergeIterator[T any, C comparable, R any] struct {
curr R
heap *v1.HeapIterator[v1.IndexedValue[C]]
items []T
transform func(T, C, R) R
err error
}

func (it *sortMergeIterator[T, C, R]) Next() bool {
ok := it.heap.Next()
if !ok {
it.err = io.EOF
return false
}

group := it.heap.At()
it.curr = it.transform(it.items[group.Index()], group.Value(), it.curr)

return true
}

func (it *sortMergeIterator[T, C, R]) At() R {
return it.curr
}

func (it *sortMergeIterator[T, C, R]) Err() error {
return it.err
}
Loading

0 comments on commit 75e166f

Please sign in to comment.