Skip to content

Commit

Permalink
Fix panic in chunk removal
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Jan 24, 2024
1 parent 06bb209 commit b2238de
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 4 deletions.
9 changes: 5 additions & 4 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ outer:
continue
}
// we must not remove items from req.Refs as long as the worker may iterater over them
g.removeNotMatchingChunks(req, o)
removeNotMatchingChunks(req, o, g.logger)
}

g.metrics.addUnfilteredCount(numChunksUnfiltered)
Expand All @@ -365,15 +365,15 @@ outer:
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}

func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) {
func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output, logger log.Logger) {
// binary search index of fingerprint
idx := sort.Search(len(req.Refs), func(i int) bool {
return req.Refs[i].Fingerprint >= uint64(res.Fp)
})

// fingerprint not found
if idx >= len(req.Refs) {
level.Error(g.logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp))
level.Error(logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp))
return
}

Expand All @@ -387,10 +387,11 @@ func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, r

for i := range res.Removals {
toRemove := res.Removals[i]
for j := range req.Refs[idx].Refs {
for j := 0; j < len(req.Refs[idx].Refs); j++ {
if toRemove.Checksum == req.Refs[idx].Refs[j].Checksum {
req.Refs[idx].Refs[j] = nil // avoid leaking pointer
req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...)
j-- // since we removed the current item at index, we have to redo the same index
}
}
}
Expand Down
58 changes: 58 additions & 0 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,64 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
})
}

func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) {
t.Run("removing chunks partially", func(t *testing.T) {
req := &logproto.FilterChunkRefRequest{
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00, Tenant: "fake", Refs: []*logproto.ShortRef{
{Checksum: 0x1},
{Checksum: 0x2},
{Checksum: 0x3},
{Checksum: 0x4},
{Checksum: 0x5},
}},
},
}
res := v1.Output{
Fp: 0x00, Removals: v1.ChunkRefs{
{Checksum: 0x2},
{Checksum: 0x4},
},
}
expected := &logproto.FilterChunkRefRequest{
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00, Tenant: "fake", Refs: []*logproto.ShortRef{
{Checksum: 0x1},
{Checksum: 0x3},
{Checksum: 0x5},
}},
},
}
removeNotMatchingChunks(req, res, log.NewNopLogger())
require.Equal(t, expected, req)
})

t.Run("removing all chunks removed fingerprint ref", func(t *testing.T) {
req := &logproto.FilterChunkRefRequest{
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00, Tenant: "fake", Refs: []*logproto.ShortRef{
{Checksum: 0x1},
{Checksum: 0x2},
{Checksum: 0x3},
}},
},
}
res := v1.Output{
Fp: 0x00, Removals: v1.ChunkRefs{
{Checksum: 0x1},
{Checksum: 0x2},
{Checksum: 0x2},
},
}
expected := &logproto.FilterChunkRefRequest{
Refs: []*logproto.GroupedChunkRefs{},
}
removeNotMatchingChunks(req, res, log.NewNopLogger())
require.Equal(t, expected, req)
})

}

func createBlockQueriers(t *testing.T, numBlocks int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]bloomshipper.BlockQuerierWithFingerprintRange, [][]v1.SeriesWithBloom) {
t.Helper()
step := (maxFp - minFp) / model.Fingerprint(numBlocks)
Expand Down

0 comments on commit b2238de

Please sign in to comment.