Skip to content

Commit

Permalink
Fix bug with compact
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Dec 5, 2023
1 parent cdad32d commit 58d1bfa
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 7 deletions.
26 changes: 19 additions & 7 deletions pkg/bloomgateway/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,22 +108,34 @@ func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache.

// Merge duplicated fingerprints by:
// 1. Sort the chunkRefs by their stream fingerprint
// 2. Append all chunks with the same fingerprint into the first fingerprint's chunk list.
// 2. Remove duplicated FPs appending all chunks into the first fingerprint's chunk list.
func mergeGroupedChunkRefs(chunkRefs []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
if len(chunkRefs) <= 1 {
return chunkRefs
}

sort.Slice(chunkRefs, func(i, j int) bool {
return chunkRefs[i].Fingerprint < chunkRefs[j].Fingerprint
})
return slices.CompactFunc(chunkRefs, func(next, prev *logproto.GroupedChunkRefs) bool {
if next.Fingerprint == prev.Fingerprint {
prev.Refs = mergeShortRefs(append(prev.Refs, next.Refs...))
return true

var lastDiffFP int
for i := 1; i < len(chunkRefs); i++ {
if chunkRefs[lastDiffFP].Fingerprint == chunkRefs[i].Fingerprint {
chunkRefs[lastDiffFP].Refs = mergeShortRefs(append(chunkRefs[lastDiffFP].Refs, chunkRefs[i].Refs...))
} else {
lastDiffFP++
chunkRefs[lastDiffFP] = chunkRefs[i]
}
return false
})
}
return chunkRefs[:lastDiffFP+1]
}

// mergeShortRefs merges short-refs by removing duplicated checksums.
func mergeShortRefs(refs []*logproto.ShortRef) []*logproto.ShortRef {
if len(refs) <= 1 {
return refs
}

sort.Slice(refs, func(i, j int) bool {
return refs[i].Checksum < refs[j].Checksum
})
Expand Down
31 changes: 31 additions & 0 deletions pkg/bloomgateway/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,27 @@ func TestMerge(t *testing.T) {
},
},
},
{
ChunkRefs: []*logproto.GroupedChunkRefs{
// Same FP as in previous input and diff chunks
{
Fingerprint: 2,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 700,
Through: 1000,
Checksum: 40,
},
{
From: 2000,
Through: 2700,
Checksum: 50,
},
},
},
},
},
},
expected: &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{
Expand Down Expand Up @@ -280,6 +301,16 @@ func TestMerge(t *testing.T) {
Through: 2500,
Checksum: 30,
},
{
From: 700,
Through: 1000,
Checksum: 40,
},
{
From: 2000,
Through: 2700,
Checksum: 50,
},
},
},
{
Expand Down

0 comments on commit 58d1bfa

Please sign in to comment.