From 19ae79da6ed8ffa6816021bb83a469565f2740b4 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 23 Jan 2024 17:22:12 +0100 Subject: [PATCH] Make parallel requests in bloom filter e2e test Signed-off-by: Christian Haudum --- integration/loki_micro_services_test.go | 69 ++++++++++++++++--------- 1 file changed, 46 insertions(+), 23 deletions(-) diff --git a/integration/loki_micro_services_test.go b/integration/loki_micro_services_test.go index da09318fa837..065a9111bfd4 100644 --- a/integration/loki_micro_services_test.go +++ b/integration/loki_micro_services_test.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "math/rand" "strings" "sync" "testing" @@ -1229,39 +1228,63 @@ func TestBloomFiltersEndToEnd(t *testing.T) { return successfulRunCount == 1 }, 30*time.Second, time.Second) - // use bloom gateway to perform needle in the haystack queries - randIdx := rand.Intn(len(uniqueStrings)) - q := fmt.Sprintf(`{job="varlog"} |= "%s"`, uniqueStrings[randIdx]) start := now.Add(-90 * time.Minute) end := now.Add(-30 * time.Minute) - resp, err := cliQueryFrontend.RunRangeQueryWithStartEnd(context.Background(), q, start, end) - require.NoError(t, err) - // verify response - require.Len(t, resp.Data.Stream, 1) - expectedLine := fmt.Sprintf(lineTpl, uniqueStrings[randIdx]) - require.Equal(t, expectedLine, resp.Data.Stream[0].Values[0][1]) + numReq := len(uniqueStrings) + var wg sync.WaitGroup + for x := 0; x < numReq; x++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + + // use bloom gateway to perform needle in the haystack queries + q := fmt.Sprintf(`{job="varlog"} |= "%s"`, uniqueStrings[idx]) + resp, err := cliQueryFrontend.RunRangeQueryWithStartEnd(ctx, q, start, end) + require.NoError(t, err) + + // verify response + require.Len(t, resp.Data.Stream, 1) + expectedLine := fmt.Sprintf(lineTpl, uniqueStrings[idx]) + require.Equal(t, expectedLine, resp.Data.Stream[0].Values[0][1]) + }(x) + } + + wg.Wait() // verify metrics that observe usage of block for filtering bloomGwMetrics, err := cliBloomGateway.Metrics() require.NoError(t, err) unfilteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_pre_filtering", bloomGwMetrics) - require.Equal(t, float64(10), unfilteredCount) + require.Equal(t, float64(10*numReq), unfilteredCount) filteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_post_filtering", bloomGwMetrics) - require.Equal(t, float64(1), filteredCount) - - mf, err := extractMetricFamily("loki_bloom_gateway_bloom_query_latency", bloomGwMetrics) - require.NoError(t, err) - - count := getValueFromMetricFamilyWithFunc(mf, &dto.LabelPair{ - Name: proto.String("status"), - Value: proto.String("success"), - }, func(m *dto.Metric) uint64 { - return m.Histogram.GetSampleCount() - }) - require.Equal(t, uint64(1), count) + require.Equal(t, float64(1*numReq), filteredCount) + + // TODO(chaudum): histogram metrics do not collect values that are below a certain threshold + // so this check if unreliable + // mf, err := extractMetricFamily("loki_bloom_gateway_bloom_query_latency", bloomGwMetrics) + // require.NoError(t, err) + + // failedCount := getValueFromMetricFamilyWithFunc(mf, &dto.LabelPair{ + // Name: proto.String("status"), + // Value: proto.String("failure"), + // }, func(m *dto.Metric) uint64 { + // return m.Histogram.GetSampleCount() + // }) + // require.Equal(t, uint64(0), failedCount) + + // successCount := getValueFromMetricFamilyWithFunc(mf, &dto.LabelPair{ + // Name: proto.String("status"), + // Value: proto.String("success"), + // }, func(m *dto.Metric) uint64 { + // return m.Histogram.GetSampleCount() + // }) + // require.Equal(t, uint64(1*numReq), successCount) } func getValueFromMF(mf *dto.MetricFamily, lbs []*dto.LabelPair) float64 {