Skip to content

Commit

Permalink
Make parallel requests in bloom filter e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Jan 23, 2024
1 parent de251c3 commit 19ae79d
Showing 1 changed file with 46 additions and 23 deletions.
69 changes: 46 additions & 23 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -1229,39 +1228,63 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
return successfulRunCount == 1
}, 30*time.Second, time.Second)

// use bloom gateway to perform needle in the haystack queries
randIdx := rand.Intn(len(uniqueStrings))
q := fmt.Sprintf(`{job="varlog"} |= "%s"`, uniqueStrings[randIdx])
start := now.Add(-90 * time.Minute)
end := now.Add(-30 * time.Minute)
resp, err := cliQueryFrontend.RunRangeQueryWithStartEnd(context.Background(), q, start, end)
require.NoError(t, err)

// verify response
require.Len(t, resp.Data.Stream, 1)
expectedLine := fmt.Sprintf(lineTpl, uniqueStrings[randIdx])
require.Equal(t, expectedLine, resp.Data.Stream[0].Values[0][1])
numReq := len(uniqueStrings)
var wg sync.WaitGroup
for x := 0; x < numReq; x++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

// use bloom gateway to perform needle in the haystack queries
q := fmt.Sprintf(`{job="varlog"} |= "%s"`, uniqueStrings[idx])
resp, err := cliQueryFrontend.RunRangeQueryWithStartEnd(ctx, q, start, end)
require.NoError(t, err)

// verify response
require.Len(t, resp.Data.Stream, 1)
expectedLine := fmt.Sprintf(lineTpl, uniqueStrings[idx])
require.Equal(t, expectedLine, resp.Data.Stream[0].Values[0][1])
}(x)
}

wg.Wait()

// verify metrics that observe usage of block for filtering
bloomGwMetrics, err := cliBloomGateway.Metrics()
require.NoError(t, err)

unfilteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_pre_filtering", bloomGwMetrics)
require.Equal(t, float64(10), unfilteredCount)
require.Equal(t, float64(10*numReq), unfilteredCount)

filteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_post_filtering", bloomGwMetrics)
require.Equal(t, float64(1), filteredCount)

mf, err := extractMetricFamily("loki_bloom_gateway_bloom_query_latency", bloomGwMetrics)
require.NoError(t, err)

count := getValueFromMetricFamilyWithFunc(mf, &dto.LabelPair{
Name: proto.String("status"),
Value: proto.String("success"),
}, func(m *dto.Metric) uint64 {
return m.Histogram.GetSampleCount()
})
require.Equal(t, uint64(1), count)
require.Equal(t, float64(1*numReq), filteredCount)

// TODO(chaudum): histogram metrics do not collect values that are below a certain threshold
// so this check if unreliable
// mf, err := extractMetricFamily("loki_bloom_gateway_bloom_query_latency", bloomGwMetrics)
// require.NoError(t, err)

// failedCount := getValueFromMetricFamilyWithFunc(mf, &dto.LabelPair{
// Name: proto.String("status"),
// Value: proto.String("failure"),
// }, func(m *dto.Metric) uint64 {
// return m.Histogram.GetSampleCount()
// })
// require.Equal(t, uint64(0), failedCount)

// successCount := getValueFromMetricFamilyWithFunc(mf, &dto.LabelPair{
// Name: proto.String("status"),
// Value: proto.String("success"),
// }, func(m *dto.Metric) uint64 {
// return m.Histogram.GetSampleCount()
// })
// require.Equal(t, uint64(1*numReq), successCount)
}

func getValueFromMF(mf *dto.MetricFamily, lbs []*dto.LabelPair) float64 {
Expand Down

0 comments on commit 19ae79d

Please sign in to comment.