Skip to content

Commit

Permalink
Improve FilterIter test
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Jan 23, 2024
1 parent b32d993 commit 1ef68ed
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 92 deletions.
83 changes: 40 additions & 43 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,61 +74,58 @@ func (t Task) Bounds() (time.Time, time.Time) {
return getDayTime(t.Request.From), getDayTime(t.Request.Through)
}

func (t Task) ChunkIterForDay(day time.Time) v1.Iterator[*logproto.GroupedChunkRefs] {
cf := filterGroupedChunkRefsByDay{day: day}
return &FilterIter[*logproto.GroupedChunkRefs]{
iter: v1.NewSliceIter(t.Request.Refs),
matches: cf.contains,
transform: cf.filter,
}
}
func (t Task) ChunkIterForDay(day time.Time) *FilterIter[*logproto.GroupedChunkRefs] {

type filterGroupedChunkRefsByDay struct {
day time.Time
}

func (cf filterGroupedChunkRefsByDay) contains(a *logproto.GroupedChunkRefs) bool {
from, through := getFromThrough(a.Refs)
if from.Time().After(cf.day.Add(Day)) || through.Time().Before(cf.day) {
return false
containsFn := func(a *logproto.GroupedChunkRefs) bool {
from, through := getFromThrough(a.Refs)
if from.Time().After(day.Add(Day)) || through.Time().Before(day) {
return false
}
return true
}
return true
}

func (cf filterGroupedChunkRefsByDay) filter(a *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs {
minTs, maxTs := getFromThrough(a.Refs)
filterFn := func(a *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs {
fmt.Println("filter", a.Fingerprint)
minTs, maxTs := getFromThrough(a.Refs)

// in most cases, all chunks are within day range
if minTs.Time().Compare(day) >= 0 && maxTs.Time().Before(day.Add(Day)) {
return a
}

// in most cases, all chunks are within day range
if minTs.Time().Compare(cf.day) >= 0 && maxTs.Time().Before(cf.day.Add(Day)) {
return a
// case where certain chunks are outside of day range
// using binary search to get min and max index of chunks that fall into the day range
min := sort.Search(len(a.Refs), func(i int) bool {
start := a.Refs[i].From.Time()
end := a.Refs[i].Through.Time()
return start.Compare(day) >= 0 || end.Compare(day) >= 0
})

max := sort.Search(len(a.Refs), func(i int) bool {
start := a.Refs[i].From.Time()
return start.Compare(day.Add(Day)) > 0
})

return &logproto.GroupedChunkRefs{
Tenant: a.Tenant,
Fingerprint: a.Fingerprint,
Refs: a.Refs[min:max],
}
}

// case where certain chunks are outside of day range
// using binary search to get min and max index of chunks that fall into the day range
min := sort.Search(len(a.Refs), func(i int) bool {
start := a.Refs[i].From.Time()
end := a.Refs[i].Through.Time()
return start.Compare(cf.day) >= 0 || end.Compare(cf.day) >= 0
})

max := sort.Search(len(a.Refs), func(i int) bool {
start := a.Refs[i].From.Time()
return start.Compare(cf.day.Add(Day)) > 0
})

return &logproto.GroupedChunkRefs{
Tenant: a.Tenant,
Fingerprint: a.Fingerprint,
Refs: a.Refs[min:max],
return &FilterIter[*logproto.GroupedChunkRefs]{
iter: v1.NewSliceIter(t.Request.Refs),
match: containsFn,
transform: filterFn,
}
}

type Predicate[T any] func(a T) bool
type Match[T any] func(a T) bool
type Transform[T any] func(a T) T

type FilterIter[T any] struct {
iter v1.Iterator[T]
matches Predicate[T]
match Match[T]
transform Transform[T]
cache T
zero T // zero value of the return type of Next()
Expand All @@ -140,7 +137,7 @@ func (it *FilterIter[T]) Next() bool {
it.cache = it.zero
return false
}
for next && !it.matches(it.iter.At()) {
for next && !it.match(it.iter.At()) {
next = it.iter.Next()
if !next {
it.cache = it.zero
Expand Down
92 changes: 43 additions & 49 deletions pkg/bloomgateway/multiplexing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,14 @@ func TestTaskMergeIterator(t *testing.T) {

t.Run("merge multiple tasks in ascending fingerprint order", func(t *testing.T) {
r1 := &logproto.FilterChunkRefRequest{
From: ts.Add(-3 * time.Hour),
Through: ts.Add(-2 * time.Hour),
From: ts.Add(-1 * time.Hour),
Through: ts,
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-3 * time.Hour), Through: ts.Add(-2 * time.Hour), Checksum: 100},
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 200},
}},
{Fingerprint: 300, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 500},
}},
},
}
Expand All @@ -83,7 +86,7 @@ func TestTaskMergeIterator(t *testing.T) {
Through: ts,
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 200},
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 100},
}},
{Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 300},
Expand Down Expand Up @@ -111,13 +114,13 @@ func TestTaskMergeIterator(t *testing.T) {
require.True(t, it.Next())
r := it.At()
require.Equal(t, model.Fingerprint(100), r.Fp)
require.Equal(t, uint32(100), r.Chks[0].Checksum)
require.Equal(t, uint32(200), r.Chks[0].Checksum)

// second item
require.True(t, it.Next())
r = it.At()
require.Equal(t, model.Fingerprint(100), r.Fp)
require.Equal(t, uint32(200), r.Chks[0].Checksum)
require.Equal(t, uint32(100), r.Chks[0].Checksum)

// third item
require.True(t, it.Next())
Expand All @@ -131,68 +134,55 @@ func TestTaskMergeIterator(t *testing.T) {
require.Equal(t, model.Fingerprint(200), r.Fp)
require.Equal(t, uint32(400), r.Chks[0].Checksum)

// fifth item
require.True(t, it.Next())
r = it.At()
require.Equal(t, model.Fingerprint(300), r.Fp)
require.Equal(t, uint32(500), r.Chks[0].Checksum)

// no more items
require.False(t, it.Next())
})
}

func fpWithChunks(tenant string, fp uint64, fromTs model.Time) *logproto.GroupedChunkRefs {
chks := make([]*logproto.ShortRef, 0, 24)
for i := fromTs; i < fromTs.Add(24*time.Hour); i = i.Add(time.Hour) {
chks = append(chks, &logproto.ShortRef{From: i, Through: i.Add(time.Hour)})
}
return &logproto.GroupedChunkRefs{Fingerprint: fp, Tenant: tenant, Refs: chks}
}

func TestChunkIterForDay(t *testing.T) {
tenant := "fake"

// Thu Nov 09 2023 10:56:50 UTC
ts := model.TimeFromUnix(1699523810)
// Nov 09 2023 10:00 UTC
ts := mktime("2023-11-09 10:00")

t.Run("filter chunk refs that fall into the day range", func(t *testing.T) {
input := &logproto.FilterChunkRefRequest{
From: ts.Add(-168 * time.Hour), // 1w ago
From: ts.Add(-7 * 24 * time.Hour),
Through: ts,
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-168 * time.Hour), Through: ts.Add(-167 * time.Hour), Checksum: 100},
{From: ts.Add(-143 * time.Hour), Through: ts.Add(-142 * time.Hour), Checksum: 101},
}},
{Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-144 * time.Hour), Through: ts.Add(-143 * time.Hour), Checksum: 200},
{From: ts.Add(-119 * time.Hour), Through: ts.Add(-118 * time.Hour), Checksum: 201},
}},
{Fingerprint: 300, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-120 * time.Hour), Through: ts.Add(-119 * time.Hour), Checksum: 300},
{From: ts.Add(-95 * time.Hour), Through: ts.Add(-94 * time.Hour), Checksum: 301},
}},
{Fingerprint: 400, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-96 * time.Hour), Through: ts.Add(-95 * time.Hour), Checksum: 400},
{From: ts.Add(-71 * time.Hour), Through: ts.Add(-70 * time.Hour), Checksum: 401},
}},
{Fingerprint: 500, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-72 * time.Hour), Through: ts.Add(-71 * time.Hour), Checksum: 500},
{From: ts.Add(-47 * time.Hour), Through: ts.Add(-46 * time.Hour), Checksum: 501},
}},
{Fingerprint: 600, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-48 * time.Hour), Through: ts.Add(-47 * time.Hour), Checksum: 600},
{From: ts.Add(-23 * time.Hour), Through: ts.Add(-22 * time.Hour), Checksum: 601},
}},
{Fingerprint: 700, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-24 * time.Hour), Through: ts.Add(-23 * time.Hour), Checksum: 700},
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 701},
}},
fpWithChunks(tenant, 0x0000, ts.Add(-7*24*time.Hour)),
fpWithChunks(tenant, 0x0400, ts.Add(-6*24*time.Hour)),
fpWithChunks(tenant, 0x0800, ts.Add(-5*24*time.Hour)),
fpWithChunks(tenant, 0x0C00, ts.Add(-4*24*time.Hour)),
fpWithChunks(tenant, 0x1000, ts.Add(-3*24*time.Hour)), // matches partially, t-58h .. t-49h
fpWithChunks(tenant, 0x1400, ts.Add(-2*24*time.Hour)), // matches partially, t-49h .. t-34h
fpWithChunks(tenant, 0x1800, ts.Add(-1*24*time.Hour)),
},
Filters: []syntax.LineFilter{
{Ty: labels.MatchEqual, Match: "foo"},
{Ty: labels.MatchEqual, Match: "bar"},
},
}

// day ranges from ts-48h to ts-24h
day := getDayTime(ts.Add(-36 * time.Hour))

expected := []*logproto.GroupedChunkRefs{
{Fingerprint: 500, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-47 * time.Hour), Through: ts.Add(-46 * time.Hour), Checksum: 501},
}},
{Fingerprint: 600, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-48 * time.Hour), Through: ts.Add(-47 * time.Hour), Checksum: 600},
}},
}
// Nov 07 2023 22:00 UTC
newTs := ts.Add(-36 * time.Hour)
// Nov 07 2023 00:00 UTC
// from ts-58h to ts-34h
day := getDayTime(newTs)

task, _, _, _ := NewTask(tenant, input)
it := task.ChunkIterForDay(day)
Expand All @@ -202,6 +192,10 @@ func TestChunkIterForDay(t *testing.T) {
output = append(output, it.At())
}

require.Equal(t, expected, output)
require.Len(t, output, 2)
require.Equal(t, output[0].Fingerprint, input.Refs[4].Fingerprint)
require.Len(t, output[0].Refs, 11)
require.Equal(t, output[1].Fingerprint, input.Refs[5].Fingerprint)
require.Len(t, output[1].Refs, 15)
})
}

0 comments on commit 1ef68ed

Please sign in to comment.