diff --git a/pkg/bloomgateway/cache.go b/pkg/bloomgateway/cache.go index 773d4b665787..ad191d6be10e 100644 --- a/pkg/bloomgateway/cache.go +++ b/pkg/bloomgateway/cache.go @@ -108,22 +108,34 @@ func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache. // Merge duplicated fingerprints by: // 1. Sort the chunkRefs by their stream fingerprint -// 2. Append all chunks with the same fingerprint into the first fingerprint's chunk list. +// 2. Remove duplicated FPs appending all chunks into the first fingerprint's chunk list. func mergeGroupedChunkRefs(chunkRefs []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs { + if len(chunkRefs) <= 1 { + return chunkRefs + } + sort.Slice(chunkRefs, func(i, j int) bool { return chunkRefs[i].Fingerprint < chunkRefs[j].Fingerprint }) - return slices.CompactFunc(chunkRefs, func(next, prev *logproto.GroupedChunkRefs) bool { - if next.Fingerprint == prev.Fingerprint { - prev.Refs = mergeShortRefs(append(prev.Refs, next.Refs...)) - return true + + var lastDiffFP int + for i := 1; i < len(chunkRefs); i++ { + if chunkRefs[lastDiffFP].Fingerprint == chunkRefs[i].Fingerprint { + chunkRefs[lastDiffFP].Refs = mergeShortRefs(append(chunkRefs[lastDiffFP].Refs, chunkRefs[i].Refs...)) + } else { + lastDiffFP++ + chunkRefs[lastDiffFP] = chunkRefs[i] } - return false - }) + } + return chunkRefs[:lastDiffFP+1] } // mergeShortRefs merges short-refs by removing duplicated checksums. func mergeShortRefs(refs []*logproto.ShortRef) []*logproto.ShortRef { + if len(refs) <= 1 { + return refs + } + sort.Slice(refs, func(i, j int) bool { return refs[i].Checksum < refs[j].Checksum }) diff --git a/pkg/bloomgateway/cache_test.go b/pkg/bloomgateway/cache_test.go index 10df735797a5..5a66162000a4 100644 --- a/pkg/bloomgateway/cache_test.go +++ b/pkg/bloomgateway/cache_test.go @@ -242,6 +242,27 @@ func TestMerge(t *testing.T) { }, }, }, + { + ChunkRefs: []*logproto.GroupedChunkRefs{ + // Same FP as in previous input and diff chunks + { + Fingerprint: 2, + Tenant: "fake", + Refs: []*logproto.ShortRef{ + { + From: 700, + Through: 1000, + Checksum: 40, + }, + { + From: 2000, + Through: 2700, + Checksum: 50, + }, + }, + }, + }, + }, }, expected: &logproto.FilterChunkRefResponse{ ChunkRefs: []*logproto.GroupedChunkRefs{ @@ -280,6 +301,16 @@ func TestMerge(t *testing.T) { Through: 2500, Checksum: 30, }, + { + From: 700, + Through: 1000, + Checksum: 40, + }, + { + From: 2000, + Through: 2700, + Checksum: 50, + }, }, }, {