Skip to content

Commit

Permalink
panic investigation 3
Browse files Browse the repository at this point in the history
Signed-off-by: Vladyslav Diachenko <[email protected]>
  • Loading branch information
vlad-diachenko committed Jan 23, 2024
1 parent 3d53d43 commit b83f276
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}

g.activeUsers.UpdateUserTimestamp(tenantID, time.Now())
level.Info(g.logger).Log("msg", "enqueue task", "task", task.ID, "closed", task.closed)
logger := log.With(g.logger, "origin_task_id", task.ID)
level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "closed", task.closed)
g.queue.Enqueue(tenantID, []string{}, task, func() {
// When enqueuing, we also add the task to the pending tasks
g.pendingTasks.Add(task.ID, task)
Expand All @@ -336,19 +337,21 @@ outer:
for {
select {
case <-ctx.Done():
level.Info(logger).Log("msg", "context done for the task", "task", task.ID, "err", ctx.Err())
// stop forwarding items from the sender to the receiver
// because it won't be consumed any more
task.Cancel()
return nil, errors.Wrap(ctx.Err(), "waiting for results")
case err := <-errCh:
level.Info(logger).Log("msg", "error for the task", "task", task.ID, "err", ctx.Err())
// stop forwarding items from the sender to the receiver
// because it won't be consumed any more
task.Cancel()
return nil, errors.Wrap(err, "waiting for results")
case res := <-resCh:
responses = append(responses, res)
// log line is helpful for debugging tests
level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount))
level.Debug(logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount))
// wait for all parts of the full response
if len(responses) == requestCount {
break outer
Expand Down

0 comments on commit b83f276

Please sign in to comment.